You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/11 18:10:13 UTC

[41/50] [abbrv] git commit: STORM-297: wait for the bolt to finish before killing the topology, otherwise netty client may be killed message before all bolts finish.

STORM-297: wait for the bolt to finish before killing the topology, otherwise netty client may be killed message before all bolts finish.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/5f7520af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/5f7520af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/5f7520af

Branch: refs/heads/security
Commit: 5f7520af012f9e5ab3bf7e051b0410aaf765bd61
Parents: b4422f1
Author: Sean Zhong <cl...@gmail.com>
Authored: Sun Jun 8 01:55:50 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Sun Jun 8 01:55:50 2014 +0800

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/testing.clj         | 9 +++++++--
 storm-core/test/clj/backtype/storm/messaging_test.clj | 2 +-
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5f7520af/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index 3ce2c3f..9a61d75 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -435,7 +435,8 @@
     ))
 
 ;; TODO: mock-sources needs to be able to mock out state spouts as well
-(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true :topology-name nil]
+;; kill-waiting: seconds to wait before we kill the topology after all spout completed
+(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true :topology-name nil :kill-waiting 0]
   ;; TODO: the idea of mocking for transactional topologies should be done an
   ;; abstraction level above... should have a complete-transactional-topology for this
   (let [{topology :topology capturer :capturer} (capture-topology topology)
@@ -470,7 +471,11 @@
     (let [storm-id (common/get-storm-id state storm-name)]
       (while-timeout TEST-TIMEOUT-MS (not (every? exhausted? (spout-objects spouts)))
         (simulate-wait cluster-map))
-
+      
+      ;; spout finished not necesary means the topology finished, If the topology requires more time, set the kill-waiting
+      ;; to bigger value    
+      (Thread/sleep (* 1000 kill-waiting))
+      
       (.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0)))
       (while-timeout TEST-TIMEOUT-MS (.assignment-info state storm-id nil)
         (simulate-wait cluster-map))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5f7520af/storm-core/test/clj/backtype/storm/messaging_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging_test.clj b/storm-core/test/clj/backtype/storm/messaging_test.clj
index c719c68..aefee0a 100644
--- a/storm-core/test/clj/backtype/storm/messaging_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging_test.clj
@@ -85,7 +85,7 @@
                                                  :parallelism-hint 4)
                         })
             results (complete-topology cluster
-                                       topology)]
+                                       topology :kill-waiting 10)]
         
         ;; No error Tuple from Bolt TestEventOrderCheckBolt
         (is (empty? (read-tuples results "2"))))))