You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/18 15:30:18 UTC
[1/3] git commit: Allow users to pass TEST-TIMEOUT-MS as param
Repository: incubator-storm
Updated Branches:
refs/heads/master 69b4601f7 -> 85c9cd5ae
Allow users to pass TEST-TIMEOUT-MS as param
It would be nice if `complete-topology` allowed a usere to pass in
the default timeout as a parameter. This PR adds functionality without
breaking any existing code. Tests pass.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/c86dbf95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/c86dbf95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/c86dbf95
Branch: refs/heads/master
Commit: c86dbf956aabcd2623bdbc937dd51acfafe87d4a
Parents: 4ac0bec
Author: Aaron Levin <aa...@demeure.com>
Authored: Wed May 21 15:43:40 2014 -0400
Committer: Aaron Levin <aa...@demeure.com>
Committed: Wed May 21 15:43:40 2014 -0400
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/testing.clj | 17 ++++++++++-------
1 file changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c86dbf95/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index 7bbe238..fd97255 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -195,7 +195,8 @@
(defn wait-until-cluster-waiting
"Wait until the cluster is idle. Should be used with time simulation."
- [cluster-map]
+ ([cluster-map] (wait-until-cluster-waiting cluster-map TEST-TIMEOUT-MS))
+ ([cluster-map timeout-ms]
;; wait until all workers, supervisors, and nimbus is waiting
(let [supervisors @(:supervisors cluster-map)
workers (filter (partial satisfies? common/DaemonCommon) (psim/all-processes))
@@ -204,12 +205,12 @@
supervisors
workers) ; because a worker may already be dead
]
- (while-timeout TEST-TIMEOUT-MS (not (every? (memfn waiting?) daemons))
+ (while-timeout timeout-ms (not (every? (memfn waiting?) daemons))
(Thread/sleep 10)
;; (doseq [d daemons]
;; (if-not ((memfn waiting?) d)
;; (println d)))
- )))
+ ))))
(defn advance-cluster-time
([cluster-map secs increment-secs]
@@ -438,7 +439,7 @@
))
;; TODO: mock-sources needs to be able to mock out state spouts as well
-(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true :topology-name nil]
+(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true :topology-name nil :timeout-ms TEST-TIMEOUT-MS]
;; TODO: the idea of mocking for transactional topologies should be done an
;; abstraction level above... should have a complete-transactional-topology for this
(let [{topology :topology capturer :capturer} (capture-topology topology)
@@ -471,11 +472,11 @@
(let [storm-id (common/get-storm-id state storm-name)]
- (while-timeout TEST-TIMEOUT-MS (not (every? exhausted? (spout-objects spouts)))
+ (while-timeout timeout-ms (not (every? exhausted? (spout-objects spouts)))
(simulate-wait cluster-map))
(.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0)))
- (while-timeout TEST-TIMEOUT-MS (.assignment-info state storm-id nil)
+ (while-timeout timeout-ms (.assignment-info state storm-id nil)
(simulate-wait cluster-map))
(when cleanup-state
(doseq [spout (spout-objects spouts)]
@@ -573,8 +574,10 @@
(defn tracked-wait
"Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
([tracked-topology]
- (tracked-wait tracked-topology 1))
+ (tracked-wait tracked-topology 1 TEST-TIMEOUT-MS))
([tracked-topology amt]
+ (tracked-wait tracked-topology amt TEST-TIMEOUT-MS))
+ ([tracked-topology amt timeout-ms]
(let [target (+ amt @(:last-spout-emit tracked-topology))
track-id (-> tracked-topology :cluster ::track-id)
waiting? (fn []
[2/3] git commit: Merge branch 'master' of
https://github.com/aaronlevin/incubator-storm into STORM-354
Posted by bo...@apache.org.
Merge branch 'master' of https://github.com/aaronlevin/incubator-storm into STORM-354
Conflicts:
storm-core/src/clj/backtype/storm/testing.clj
STORM-354: Testing: allow users to pass TEST-TIMEOUT-MS as param for complete-topology
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/44aaaa91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/44aaaa91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/44aaaa91
Branch: refs/heads/master
Commit: 44aaaa91be7cfa5fef8424f08a25bd1e8b3bf458
Parents: 69b4601 c86dbf9
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Fri Jul 18 08:25:44 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Fri Jul 18 08:25:44 2014 -0500
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/testing.clj | 48 ++++++++++++----------
1 file changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/44aaaa91/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/testing.clj
index 70c783a,fd97255..54f40e0
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@@ -205,14 -203,14 +206,14 @@@
daemons (concat
[(:nimbus cluster-map)]
supervisors
- workers) ; because a worker may already be dead
- ]
+ ; because a worker may already be dead
+ workers)]
- (while-timeout TEST-TIMEOUT-MS (not (every? (memfn waiting?) daemons))
+ (while-timeout timeout-ms (not (every? (memfn waiting?) daemons))
- (Thread/sleep 10)
-;; (doseq [d daemons]
-;; (if-not ((memfn waiting?) d)
-;; (println d)))
- ))))
+ (Thread/sleep 10)
+ ;; (doseq [d daemons]
+ ;; (if-not ((memfn waiting?) d)
+ ;; (println d)))
- )))
++ ))))
(defn advance-cluster-time
([cluster-map secs increment-secs]
@@@ -437,24 -425,21 +438,25 @@@
(.set_bolts topology
(assoc (clojurify-structure bolts)
(uuid)
- (Bolt.
- (serialize-component-object capturer)
- (mk-plain-component-common (into {} (for [[id direct?] all-streams]
- [id (if direct?
- (mk-direct-grouping)
- (mk-global-grouping))]))
- {}
- nil))
- ))
+ (Bolt.
+ (serialize-component-object capturer)
+ (mk-plain-component-common (into {} (for [[id direct?] all-streams]
+ [id (if direct?
+ (mk-direct-grouping)
+ (mk-global-grouping))]))
+ {}
+ nil))))
{:topology topology
- :capturer capturer}
- ))
+ :capturer capturer}))
;; TODO: mock-sources needs to be able to mock out state spouts as well
-(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true :topology-name nil :timeout-ms TEST-TIMEOUT-MS]
+(defnk complete-topology
+ [cluster-map topology
+ :mock-sources {}
+ :storm-conf {}
+ :cleanup-state true
- :topology-name nil]
++ :topology-name nil
++ :timeout-ms TEST-TIMEOUT-MS]
;; TODO: the idea of mocking for transactional topologies should be done an
;; abstraction level above... should have a complete-transactional-topology for this
(let [{topology :topology capturer :capturer} (capture-topology topology)
@@@ -477,16 -467,17 +479,16 @@@
(doseq [spout (spout-objects spouts)]
(startup spout))
-
+
(submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)
-
-
+
(let [storm-id (common/get-storm-id state storm-name)]
- (while-timeout TEST-TIMEOUT-MS (not (every? exhausted? (spout-objects spouts)))
+ (while-timeout timeout-ms (not (every? exhausted? (spout-objects spouts)))
- (simulate-wait cluster-map))
+ (simulate-wait cluster-map))
(.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0)))
- (while-timeout TEST-TIMEOUT-MS (.assignment-info state storm-id nil)
+ (while-timeout timeout-ms (.assignment-info state storm-id nil)
- (simulate-wait cluster-map))
+ (simulate-wait cluster-map))
(when cleanup-state
(doseq [spout (spout-objects spouts)]
(cleanup spout))))
@@@ -580,27 -574,29 +582,29 @@@
(defn tracked-wait
"Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
([tracked-topology]
- (tracked-wait tracked-topology 1))
+ (tracked-wait tracked-topology 1 TEST-TIMEOUT-MS))
([tracked-topology amt]
- (let [target (+ amt @(:last-spout-emit tracked-topology))
- track-id (-> tracked-topology :cluster ::track-id)
- waiting? (fn []
- (or (not= target (global-amt track-id "spout-emitted"))
- (not= (global-amt track-id "transferred")
- (global-amt track-id "processed"))
- ))]
- (while-timeout TEST-TIMEOUT-MS (waiting?)
- ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
- ;; (println "Processed: " (global-amt track-id "processed"))
- ;; (println "Transferred: " (global-amt track-id "transferred"))
- (Thread/sleep 500))
- (reset! (:last-spout-emit tracked-topology) target))))
-
- (defnk test-tuple
+ (tracked-wait tracked-topology amt TEST-TIMEOUT-MS))
+ ([tracked-topology amt timeout-ms]
+ (let [target (+ amt @(:last-spout-emit tracked-topology))
+ track-id (-> tracked-topology :cluster ::track-id)
+ waiting? (fn []
+ (or (not= target (global-amt track-id "spout-emitted"))
+ (not= (global-amt track-id "transferred")
+ (global-amt track-id "processed"))
+ ))]
+ (while-timeout TEST-TIMEOUT-MS (waiting?)
- ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
- ;; (println "Processed: " (global-amt track-id "processed"))
- ;; (println "Transferred: " (global-amt track-id "transferred"))
- (Thread/sleep 500))
- (reset! (:last-spout-emit tracked-topology) target)
- )))
-
-(defnk test-tuple [values
- :stream Utils/DEFAULT_STREAM_ID
- :component "component"
- :fields nil]
++ ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
++ ;; (println "Processed: " (global-amt track-id "processed"))
++ ;; (println "Transferred: " (global-amt track-id "transferred"))
++ (Thread/sleep 500))
++ (reset! (:last-spout-emit tracked-topology) target))))
++
++(defnk test-tuple
+ [values
+ :stream Utils/DEFAULT_STREAM_ID
+ :component "component"
+ :fields nil]
(let [fields (or fields
(->> (iterate inc 1)
(take (count values))
[3/3] git commit: Added STROM-354 to changelog and readme.
Posted by bo...@apache.org.
Added STROM-354 to changelog and readme.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/85c9cd5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/85c9cd5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/85c9cd5a
Branch: refs/heads/master
Commit: 85c9cd5aee09ac2bf510fbc7fe877ee16c76ee28
Parents: 44aaaa9
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Fri Jul 18 08:28:52 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Fri Jul 18 08:28:52 2014 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
README.markdown | 1 +
2 files changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/85c9cd5a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2817e5d..777ed9a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -19,6 +19,7 @@
* STORM-328: More restrictive Config checks, strict range check within Utils.getInt()
* STORM-381: Replace broken jquery.tablesorter.min.js to latest
* STORM-312: add storm monitor tools to monitor throughtput interactively
+ * STORM-354: Testing: allow users to pass TEST-TIMEOUT-MS as param for complete-topology
## 0.9.2-incubating
* STORM-66: send taskid on initial handshake
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/85c9cd5a/README.markdown
----------------------------------------------------------------------
diff --git a/README.markdown b/README.markdown
index 03b4ccd..063de70 100644
--- a/README.markdown
+++ b/README.markdown
@@ -151,6 +151,7 @@ under the License.
* DashengJu ([@dashengju](https://github.com/dashengju))
* Jungtaek Lim ([@HeartSaVioR](https://github.com/HeartSaVioR))
* Li Jiahong ([@Gvain](https://github.com/Gvain))
+* Aaron Levin ([@aaronlevin](https://github.com/aaronlevin))
## Acknowledgements