You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/29 22:58:08 UTC
[09/50] git commit: Merge branch 'master' of
https://github.com/aaronlevin/incubator-storm into STORM-354
Merge branch 'master' of https://github.com/aaronlevin/incubator-storm into STORM-354
Conflicts:
storm-core/src/clj/backtype/storm/testing.clj
STORM-354: Testing: allow users to pass TEST-TIMEOUT-MS as param for complete-topology
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/44aaaa91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/44aaaa91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/44aaaa91
Branch: refs/heads/security
Commit: 44aaaa91be7cfa5fef8424f08a25bd1e8b3bf458
Parents: 69b4601 c86dbf9
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Fri Jul 18 08:25:44 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Fri Jul 18 08:25:44 2014 -0500
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/testing.clj | 48 ++++++++++++----------
1 file changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/44aaaa91/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/testing.clj
index 70c783a,fd97255..54f40e0
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@@ -205,14 -203,14 +206,14 @@@
daemons (concat
[(:nimbus cluster-map)]
supervisors
- workers) ; because a worker may already be dead
- ]
+ ; because a worker may already be dead
+ workers)]
- (while-timeout TEST-TIMEOUT-MS (not (every? (memfn waiting?) daemons))
+ (while-timeout timeout-ms (not (every? (memfn waiting?) daemons))
- (Thread/sleep 10)
-;; (doseq [d daemons]
-;; (if-not ((memfn waiting?) d)
-;; (println d)))
- ))))
+ (Thread/sleep 10)
+ ;; (doseq [d daemons]
+ ;; (if-not ((memfn waiting?) d)
+ ;; (println d)))
- )))
++ ))))
(defn advance-cluster-time
([cluster-map secs increment-secs]
@@@ -437,24 -425,21 +438,25 @@@
(.set_bolts topology
(assoc (clojurify-structure bolts)
(uuid)
- (Bolt.
- (serialize-component-object capturer)
- (mk-plain-component-common (into {} (for [[id direct?] all-streams]
- [id (if direct?
- (mk-direct-grouping)
- (mk-global-grouping))]))
- {}
- nil))
- ))
+ (Bolt.
+ (serialize-component-object capturer)
+ (mk-plain-component-common (into {} (for [[id direct?] all-streams]
+ [id (if direct?
+ (mk-direct-grouping)
+ (mk-global-grouping))]))
+ {}
+ nil))))
{:topology topology
- :capturer capturer}
- ))
+ :capturer capturer}))
;; TODO: mock-sources needs to be able to mock out state spouts as well
-(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true :topology-name nil :timeout-ms TEST-TIMEOUT-MS]
+(defnk complete-topology
+ [cluster-map topology
+ :mock-sources {}
+ :storm-conf {}
+ :cleanup-state true
- :topology-name nil]
++ :topology-name nil
++ :timeout-ms TEST-TIMEOUT-MS]
;; TODO: the idea of mocking for transactional topologies should be done an
;; abstraction level above... should have a complete-transactional-topology for this
(let [{topology :topology capturer :capturer} (capture-topology topology)
@@@ -477,16 -467,17 +479,16 @@@
(doseq [spout (spout-objects spouts)]
(startup spout))
-
+
(submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)
-
-
+
(let [storm-id (common/get-storm-id state storm-name)]
- (while-timeout TEST-TIMEOUT-MS (not (every? exhausted? (spout-objects spouts)))
+ (while-timeout timeout-ms (not (every? exhausted? (spout-objects spouts)))
- (simulate-wait cluster-map))
+ (simulate-wait cluster-map))
(.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0)))
- (while-timeout TEST-TIMEOUT-MS (.assignment-info state storm-id nil)
+ (while-timeout timeout-ms (.assignment-info state storm-id nil)
- (simulate-wait cluster-map))
+ (simulate-wait cluster-map))
(when cleanup-state
(doseq [spout (spout-objects spouts)]
(cleanup spout))))
@@@ -580,27 -574,29 +582,29 @@@
(defn tracked-wait
"Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
([tracked-topology]
- (tracked-wait tracked-topology 1))
+ (tracked-wait tracked-topology 1 TEST-TIMEOUT-MS))
([tracked-topology amt]
- (let [target (+ amt @(:last-spout-emit tracked-topology))
- track-id (-> tracked-topology :cluster ::track-id)
- waiting? (fn []
- (or (not= target (global-amt track-id "spout-emitted"))
- (not= (global-amt track-id "transferred")
- (global-amt track-id "processed"))
- ))]
- (while-timeout TEST-TIMEOUT-MS (waiting?)
- ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
- ;; (println "Processed: " (global-amt track-id "processed"))
- ;; (println "Transferred: " (global-amt track-id "transferred"))
- (Thread/sleep 500))
- (reset! (:last-spout-emit tracked-topology) target))))
-
- (defnk test-tuple
+ (tracked-wait tracked-topology amt TEST-TIMEOUT-MS))
+ ([tracked-topology amt timeout-ms]
+ (let [target (+ amt @(:last-spout-emit tracked-topology))
+ track-id (-> tracked-topology :cluster ::track-id)
+ waiting? (fn []
+ (or (not= target (global-amt track-id "spout-emitted"))
+ (not= (global-amt track-id "transferred")
+ (global-amt track-id "processed"))
+ ))]
+ (while-timeout TEST-TIMEOUT-MS (waiting?)
- ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
- ;; (println "Processed: " (global-amt track-id "processed"))
- ;; (println "Transferred: " (global-amt track-id "transferred"))
- (Thread/sleep 500))
- (reset! (:last-spout-emit tracked-topology) target)
- )))
-
-(defnk test-tuple [values
- :stream Utils/DEFAULT_STREAM_ID
- :component "component"
- :fields nil]
++ ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
++ ;; (println "Processed: " (global-amt track-id "processed"))
++ ;; (println "Transferred: " (global-amt track-id "transferred"))
++ (Thread/sleep 500))
++ (reset! (:last-spout-emit tracked-topology) target))))
++
++(defnk test-tuple
+ [values
+ :stream Utils/DEFAULT_STREAM_ID
+ :component "component"
+ :fields nil]
(let [fields (or fields
(->> (iterate inc 1)
(take (count values))