You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/09/19 20:43:20 UTC
[1/2] storm git commit: Merge branch 'STORM-1837-2' of
https://github.com/srdo/storm into STORM-1837
Repository: storm
Updated Branches:
refs/heads/1.x-branch 9d039588f -> 58cbfe5e2
Merge branch 'STORM-1837-2' of https://github.com/srdo/storm into STORM-1837
STORM-1837: Fix complete-topology and prevent message loss
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b9899ac3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b9899ac3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b9899ac3
Branch: refs/heads/1.x-branch
Commit: b9899ac3050651d8c6ac6f64c31cdcdc369b22db
Parents: 9d03958
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Sep 19 15:12:25 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Sep 19 15:27:13 2016 -0500
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/testing.clj | 3 +-
.../apache/storm/messaging/local/Context.java | 70 +++++++++++++++++---
.../org/apache/storm/testing4j_test.clj | 25 ++++---
3 files changed, 79 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b9899ac3/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 5e5700c..565d1b9 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -540,7 +540,8 @@
(startup spout))
(submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)
- (advance-cluster-time cluster-map 11)
+ (when (Time/isSimulating)
+ (advance-cluster-time cluster-map 11))
(let [storm-id (common/get-storm-id state storm-name)]
;;Give the topology time to come up without using it to wait for the spouts to complete
http://git-wip-us.apache.org/repos/asf/storm/blob/b9899ac3/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java b/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
index 4f0ba1f..7300847 100644
--- a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
+++ b/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
@@ -27,8 +27,12 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
-import org.apache.storm.Config;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import org.apache.storm.grouping.Load;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.TaskMessage;
@@ -39,7 +43,7 @@ public class Context implements IContext {
private static final Logger LOG = LoggerFactory.getLogger(Context.class);
private static class LocalServer implements IConnection {
- IConnectionCallback _cb;
+ volatile IConnectionCallback _cb;
final ConcurrentHashMap<Integer, Double> _load = new ConcurrentHashMap<>();
@Override
@@ -82,31 +86,76 @@ public class Context implements IContext {
private static class LocalClient implements IConnection {
private final LocalServer _server;
+ //Messages sent before the server registered a callback
+ private final LinkedBlockingQueue<TaskMessage> _pendingDueToUnregisteredServer;
+ private final ScheduledExecutorService _pendingFlusher;
public LocalClient(LocalServer server) {
_server = server;
+ _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
+ _pendingFlusher = Executors.newScheduledThreadPool(1, new ThreadFactory(){
+ @Override
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable);
+ thread.setName("LocalClientFlusher-" + thread.getId());
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ _pendingFlusher.scheduleAtFixedRate(new Runnable(){
+ @Override
+ public void run(){
+ try {
+ //Ensure messages are flushed even if no more sends are performed
+ flushPending();
+ } catch (Throwable t) {
+ LOG.error("Uncaught throwable in pending message flusher thread, messages may be lost", t);
+ throw new RuntimeException(t);
+ }
+ }
+ }, 5, 5, TimeUnit.SECONDS);
}
@Override
public void registerRecv(IConnectionCallback cb) {
throw new IllegalArgumentException("SHOULD NOT HAPPEN");
}
-
+
+ private void flushPending(){
+ IConnectionCallback serverCb = _server._cb;
+ if (serverCb != null && !_pendingDueToUnregisteredServer.isEmpty()) {
+ ArrayList<TaskMessage> ret = new ArrayList<>();
+ _pendingDueToUnregisteredServer.drainTo(ret);
+ serverCb.recv(ret);
+ }
+ }
+
@Override
public void send(int taskId, byte[] payload) {
- if (_server._cb != null) {
- _server._cb.recv(Arrays.asList(new TaskMessage(taskId, payload)));
+ TaskMessage message = new TaskMessage(taskId, payload);
+ IConnectionCallback serverCb = _server._cb;
+ if (serverCb != null) {
+ flushPending();
+ serverCb.recv(Arrays.asList(message));
+ } else {
+ _pendingDueToUnregisteredServer.add(message);
}
}
@Override
public void send(Iterator<TaskMessage> msgs) {
- if (_server._cb != null) {
+ IConnectionCallback serverCb = _server._cb;
+ if (serverCb != null) {
+ flushPending();
ArrayList<TaskMessage> ret = new ArrayList<>();
while (msgs.hasNext()) {
ret.add(msgs.next());
}
- _server._cb.recv(ret);
+ serverCb.recv(ret);
+ } else {
+ while(msgs.hasNext()){
+ _pendingDueToUnregisteredServer.add(msgs.next());
+ }
}
}
@@ -122,7 +171,12 @@ public class Context implements IContext {
@Override
public void close() {
- //NOOP
+ _pendingFlusher.shutdown();
+ try{
+ _pendingFlusher.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e){
+ throw new RuntimeException("Interrupted while awaiting flusher shutdown", e);
+ }
}
};
http://git-wip-us.apache.org/repos/asf/storm/blob/b9899ac3/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
index cd139d7..5f71e86 100644
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
@@ -59,15 +59,8 @@
(is (Time/isSimulating)))))
(is (not (Time/isSimulating)))))
-(deftest test-complete-topology
- (doseq [zmq-on? [true false]
- :let [daemon-conf (doto (Config.)
- (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
- mk-cluster-param (doto (MkClusterParam.)
- (.setSupervisors (int 4))
- (.setDaemonConf daemon-conf))]]
- (Testing/withSimulatedTimeLocalCluster
- (reify TestJob
+(def complete-topology-testjob
+ (reify TestJob
(^void run [this ^ILocalCluster cluster]
(let [topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
@@ -97,7 +90,19 @@
(Testing/readTuples results "3")))
(is (= [[1] [2] [3] [4]]
(Testing/readTuples results "4")))
- ))))))
+ ))))
+
+(deftest test-complete-topology
+ (doseq [zmq-on? [true false]
+ :let [daemon-conf (doto (Config.)
+ (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
+ mk-cluster-param (doto (MkClusterParam.)
+ (.setSupervisors (int 4))
+ (.setDaemonConf daemon-conf))]]
+ (Testing/withSimulatedTimeLocalCluster
+ mk-cluster-param complete-topology-testjob )
+ (Testing/withLocalCluster
+ mk-cluster-param complete-topology-testjob)))
(deftest test-with-tracked-cluster
(Testing/withTrackedCluster
[2/2] storm git commit: Added STORM-1837 to Changelog
Posted by bo...@apache.org.
Added STORM-1837 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/58cbfe5e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/58cbfe5e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/58cbfe5e
Branch: refs/heads/1.x-branch
Commit: 58cbfe5e280cdff4313a51aa786fdda13b404694
Parents: b9899ac
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Sep 19 15:38:59 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Sep 19 15:38:59 2016 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/58cbfe5e/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ab0bc6c..a7a2f9c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-1837: Fix complete-topology and prevent message loss
* STORM-2098: DruidBeamBolt: Pass DruidConfig.Builder as constructor argument
* STORM-2092: optimize TridentKafkaState batch sending
* STORM-1979: Storm Druid Connector implementation.