You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/18 15:30:19 UTC

[2/3] git commit: Merge branch 'master' of https://github.com/aaronlevin/incubator-storm into STORM-354

Merge branch 'master' of https://github.com/aaronlevin/incubator-storm into STORM-354

Conflicts:
	storm-core/src/clj/backtype/storm/testing.clj

STORM-354: Testing: allow users to pass TEST-TIMEOUT-MS as param for complete-topology


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/44aaaa91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/44aaaa91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/44aaaa91

Branch: refs/heads/master
Commit: 44aaaa91be7cfa5fef8424f08a25bd1e8b3bf458
Parents: 69b4601 c86dbf9
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Fri Jul 18 08:25:44 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Fri Jul 18 08:25:44 2014 -0500

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/testing.clj | 48 ++++++++++++----------
 1 file changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/44aaaa91/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/testing.clj
index 70c783a,fd97255..54f40e0
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@@ -205,14 -203,14 +206,14 @@@
          daemons (concat
                    [(:nimbus cluster-map)]
                    supervisors
 -                  workers) ; because a worker may already be dead
 -        ]
 +                  ; because a worker may already be dead
 +                  workers)]
-     (while-timeout TEST-TIMEOUT-MS (not (every? (memfn waiting?) daemons))
+     (while-timeout timeout-ms (not (every? (memfn waiting?) daemons))
 -      (Thread/sleep 10)
 -;;      (doseq [d daemons]
 -;;        (if-not ((memfn waiting?) d)
 -;;          (println d)))
 -      ))))
 +                   (Thread/sleep 10)
 +                   ;;      (doseq [d daemons]
 +                   ;;        (if-not ((memfn waiting?) d)
 +                   ;;          (println d)))
-                    )))
++                   ))))
  
  (defn advance-cluster-time
    ([cluster-map secs increment-secs]
@@@ -437,24 -425,21 +438,25 @@@
      (.set_bolts topology
                  (assoc (clojurify-structure bolts)
                    (uuid)
 -                  (Bolt.                   
 -                   (serialize-component-object capturer)
 -                   (mk-plain-component-common (into {} (for [[id direct?] all-streams]
 -                                                         [id (if direct?
 -                                                               (mk-direct-grouping)
 -                                                               (mk-global-grouping))]))
 -                                              {}
 -                                              nil))
 -                  ))
 +                  (Bolt.
 +                    (serialize-component-object capturer)
 +                    (mk-plain-component-common (into {} (for [[id direct?] all-streams]
 +                                                          [id (if direct?
 +                                                                (mk-direct-grouping)
 +                                                                (mk-global-grouping))]))
 +                                               {}
 +                                               nil))))
      {:topology topology
 -     :capturer capturer}
 -    ))
 +     :capturer capturer}))
  
  ;; TODO: mock-sources needs to be able to mock out state spouts as well
 -(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true :topology-name nil :timeout-ms TEST-TIMEOUT-MS]
 +(defnk complete-topology
 +  [cluster-map topology
 +   :mock-sources {}
 +   :storm-conf {}
 +   :cleanup-state true
-    :topology-name nil]
++   :topology-name nil
++   :timeout-ms TEST-TIMEOUT-MS]
    ;; TODO: the idea of mocking for transactional topologies should be done an
    ;; abstraction level above... should have a complete-transactional-topology for this
    (let [{topology :topology capturer :capturer} (capture-topology topology)
@@@ -477,16 -467,17 +479,16 @@@
  
      (doseq [spout (spout-objects spouts)]
        (startup spout))
 -    
 +
      (submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)
 -    
 -    
 +
      (let [storm-id (common/get-storm-id state storm-name)]
-       (while-timeout TEST-TIMEOUT-MS (not (every? exhausted? (spout-objects spouts)))
+       (while-timeout timeout-ms (not (every? exhausted? (spout-objects spouts)))
 -        (simulate-wait cluster-map))
 +                     (simulate-wait cluster-map))
  
        (.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0)))
-       (while-timeout TEST-TIMEOUT-MS (.assignment-info state storm-id nil)
+       (while-timeout timeout-ms (.assignment-info state storm-id nil)
 -        (simulate-wait cluster-map))
 +                     (simulate-wait cluster-map))
        (when cleanup-state
          (doseq [spout (spout-objects spouts)]
            (cleanup spout))))
@@@ -580,27 -574,29 +582,29 @@@
  (defn tracked-wait
    "Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
    ([tracked-topology]
-    (tracked-wait tracked-topology 1))
+      (tracked-wait tracked-topology 1 TEST-TIMEOUT-MS))
    ([tracked-topology amt]
-    (let [target (+ amt @(:last-spout-emit tracked-topology))
-          track-id (-> tracked-topology :cluster ::track-id)
-          waiting? (fn []
-                     (or (not= target (global-amt track-id "spout-emitted"))
-                         (not= (global-amt track-id "transferred")
-                               (global-amt track-id "processed"))
-                         ))]
-      (while-timeout TEST-TIMEOUT-MS (waiting?)
-                     ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
-                     ;; (println "Processed: " (global-amt track-id "processed"))
-                     ;; (println "Transferred: " (global-amt track-id "transferred"))
-                     (Thread/sleep 500))
-      (reset! (:last-spout-emit tracked-topology) target))))
- 
- (defnk test-tuple
+      (tracked-wait tracked-topology amt TEST-TIMEOUT-MS))
+   ([tracked-topology amt timeout-ms]
+       (let [target (+ amt @(:last-spout-emit tracked-topology))
+             track-id (-> tracked-topology :cluster ::track-id)
+             waiting? (fn []
+                        (or (not= target (global-amt track-id "spout-emitted"))
+                            (not= (global-amt track-id "transferred")                                 
+                                  (global-amt track-id "processed"))
+                            ))]
+         (while-timeout TEST-TIMEOUT-MS (waiting?)
 -          ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
 -          ;; (println "Processed: " (global-amt track-id "processed"))
 -          ;; (println "Transferred: " (global-amt track-id "transferred"))
 -          (Thread/sleep 500))
 -        (reset! (:last-spout-emit tracked-topology) target)
 -        )))
 -
 -(defnk test-tuple [values
 -                   :stream Utils/DEFAULT_STREAM_ID
 -                   :component "component"
 -                   :fields nil]
++                       ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
++                       ;; (println "Processed: " (global-amt track-id "processed"))
++                       ;; (println "Transferred: " (global-amt track-id "transferred"))
++                       (Thread/sleep 500))
++        (reset! (:last-spout-emit tracked-topology) target))))
++
++(defnk test-tuple 
 +  [values
 +   :stream Utils/DEFAULT_STREAM_ID
 +   :component "component"
 +   :fields nil]
    (let [fields (or fields
                     (->> (iterate inc 1)
                          (take (count values))