You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/24 17:17:21 UTC

[11/27] storm git commit: Merge branch 'master' into ClusterUtils

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index b89b7bb,9c31ddf..3ebdbcd
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -21,20 -21,20 +21,23 @@@
    (:require [clojure [string :as string] [set :as set]])
    (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout])
    (:import [org.apache.storm.scheduler ISupervisor])
-   (:import [org.apache.storm.utils ConfigUtils])
+   (:import [org.apache.storm.utils Time Utils$UptimeComputer ConfigUtils])
    (:import [org.apache.storm.generated RebalanceOptions])
-   (:import [org.apache.storm.testing.staticmocking MockedConfigUtils MockedCluster])
 -  (:import [org.mockito Matchers Mockito])
++  (:import [org.apache.storm.testing.staticmocking MockedCluster])
    (:import [java.util UUID])
-   (:import [org.mockito Mockito])
++  (:import [org.mockito Mockito Matchers])
 +  (:import [org.mockito.exceptions.base MockitoAssertionError])
    (:import [java.io File])
    (:import [java.nio.file Files])
-   (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils])
 -  (:import [org.apache.storm.utils Utils IPredicate]
++  (:import [org.apache.storm.utils Utils IPredicate])
++  (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils]
+            [org.apache.storm.utils.staticmocking ConfigUtilsInstaller
+                                                  UtilsInstaller])
    (:import [java.nio.file.attribute FileAttribute])
 -  (:use [org.apache.storm config testing util timer log])
 +  (:use [org.apache.storm config testing util timer log converter])
    (:use [org.apache.storm.daemon common])
    (:require [org.apache.storm.daemon [worker :as worker] [supervisor :as supervisor]]
 -            [org.apache.storm [thrift :as thrift] [cluster :as cluster]])
 +            [org.apache.storm [thrift :as thrift]])
    (:use [conjure core])
    (:require [clojure.java.io :as io]))
  
@@@ -43,9 -43,10 +46,10 @@@
    [cluster supervisor-id port]
    (let [state (:storm-cluster-state cluster)
          slot-assigns (for [storm-id (.assignments state nil)]
 -                        (let [executors (-> (.assignment-info state storm-id nil)
 +                        (let [executors (-> (clojurify-assignment (.assignmentInfo state storm-id nil))
                                          :executor->node+port
-                                         reverse-map
+                                         (Utils/reverseMap)
+                                         clojurify-structure
                                          (get [supervisor-id port] ))]
                            (when executors [storm-id executors])
                            ))
@@@ -565,198 -632,203 +635,201 @@@
            fake-isupervisor (reify ISupervisor
                               (getSupervisorId [this] nil)
                               (getAssignmentId [this] nil))
+           fake-cu (proxy [ConfigUtils] []
+                     (supervisorStateImpl [conf] nil)
+                     (supervisorLocalDirImpl [conf] nil))
+           fake-utils (proxy [Utils] []
+                        (localHostnameImpl [] nil)
+                        (makeUptimeComputer [] (proxy [Utils$UptimeComputer] []
 -                                                (upTime [] 0))))]
++                                                (upTime [] 0))))
 +          cluster-utils (Mockito/mock ClusterUtils)]
-       (with-open [_ (proxy [MockedConfigUtils] []
-                       (supervisorStateImpl [conf] nil)
-                       (supervisorLocalDirImpl [conf] nil))
+       (with-open [_ (ConfigUtilsInstaller. fake-cu)
 -                  _ (UtilsInstaller. fake-utils)]
 -        (stubbing [cluster/mk-storm-cluster-state nil
 -                   mk-timer nil]
++                  _ (UtilsInstaller. fake-utils)
 +                  mocked-cluster (MockedCluster. cluster-utils)]
-         (stubbing [uptime-computer nil
-               ;   cluster/mk-storm-cluster-state nil
-                  local-hostname nil
-                  mk-timer nil]
            (supervisor/supervisor-data auth-conf nil fake-isupervisor)
-           (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))
-         ;  (verify-call-times-for cluster/mk-storm-cluster-state 1)
-         ;  (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
-         ;                                     expected-acls)
-          )))))
 -          (verify-call-times-for cluster/mk-storm-cluster-state 1)
 -          (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
 -              expected-acls)))))
++          (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))))))
  
- (deftest test-write-log-metadata
-   (testing "supervisor writes correct data to logs metadata file"
-     (let [exp-owner "alice"
-           exp-worker-id "42"
-           exp-storm-id "0123456789"
-           exp-port 4242
-           exp-logs-users ["bob" "charlie" "daryl"]
-           exp-logs-groups ["read-only-group" "special-group"]
-           storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
-                       TOPOLOGY-USERS ["charlie" "bob"]
-                       TOPOLOGY-GROUPS ["special-group"]
-                       LOGS-GROUPS ["read-only-group"]
-                       LOGS-USERS ["daryl"]}
-           exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
-                     "worker-id" exp-worker-id
-                     LOGS-USERS exp-logs-users
-                     LOGS-GROUPS exp-logs-groups}
-           conf {}]
-       (mocking [supervisor/write-log-metadata-to-yaml-file!]
-         (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
-                                         exp-storm-id exp-port conf)
-         (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
-                                       exp-storm-id exp-port exp-data conf)))))
+   (deftest test-write-log-metadata
+     (testing "supervisor writes correct data to logs metadata file"
+       (let [exp-owner "alice"
+             exp-worker-id "42"
+             exp-storm-id "0123456789"
+             exp-port 4242
+             exp-logs-users ["bob" "charlie" "daryl"]
+             exp-logs-groups ["read-only-group" "special-group"]
+             storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
+                         TOPOLOGY-USERS ["charlie" "bob"]
+                         TOPOLOGY-GROUPS ["special-group"]
+                         LOGS-GROUPS ["read-only-group"]
+                         LOGS-USERS ["daryl"]}
+             exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
+                       "worker-id" exp-worker-id
+                       LOGS-USERS exp-logs-users
+                       LOGS-GROUPS exp-logs-groups}
+             conf {}]
+         (mocking [supervisor/write-log-metadata-to-yaml-file!]
+           (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
+             exp-storm-id exp-port conf)
+           (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
+             exp-storm-id exp-port exp-data conf)))))
  
- (deftest test-worker-launcher-requires-user
-   (testing "worker-launcher throws on blank user"
-     (mocking [launch-process]
-       (is (thrown-cause-with-msg? java.lang.IllegalArgumentException
-                                   #"(?i).*user cannot be blank.*"
-                                   (supervisor/worker-launcher {} nil ""))))))
+   (deftest test-worker-launcher-requires-user
+     (testing "worker-launcher throws on blank user"
+       (let [utils-proxy (proxy [Utils] []
+                           (launchProcessImpl [& _] nil))]
+         (with-open [_ (UtilsInstaller. utils-proxy)]
+           (is (try
+                 (supervisor/worker-launcher {} nil "")
+                 false
+                 (catch Throwable t
+                   (and (re-matches #"(?i).*user cannot be blank.*" (.getMessage t))
+                        (Utils/exceptionCauseIsInstanceOf java.lang.IllegalArgumentException t)))))))))
  
- (defn found? [sub-str input-str]
-   (if (string? input-str)
-     (contrib-str/substring? sub-str (str input-str))
-     (boolean (some #(contrib-str/substring? sub-str %) input-str))))
+   (defn found? [sub-str input-str]
+     (if (string? input-str)
+       (contrib-str/substring? sub-str (str input-str))
+       (boolean (some #(contrib-str/substring? sub-str %) input-str))))
  
- (defn not-found? [sub-str input-str]
+   (defn not-found? [sub-str input-str]
      (complement (found? sub-str input-str)))
  
- (deftest test-substitute-childopts-happy-path-string
-   (testing "worker-launcher replaces ids in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
+   (deftest test-substitute-childopts-happy-path-string
+     (testing "worker-launcher replaces ids in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
  
- (deftest test-substitute-childopts-happy-path-list
-   (testing "worker-launcher replaces ids in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
+   (deftest test-substitute-childopts-happy-path-list
+     (testing "worker-launcher replaces ids in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
  
- (deftest test-substitute-childopts-happy-path-list-arraylist
-   (testing "worker-launcher replaces ids in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
+   (deftest test-substitute-childopts-happy-path-list-arraylist
+     (testing "worker-launcher replaces ids in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
  
- (deftest test-substitute-childopts-topology-id-alone
-   (testing "worker-launcher replaces ids in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
+   (deftest test-substitute-childopts-topology-id-alone
+     (testing "worker-launcher replaces ids in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
  
- (deftest test-substitute-childopts-no-keys
-   (testing "worker-launcher has no ids to replace in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
+   (deftest test-substitute-childopts-no-keys
+     (testing "worker-launcher has no ids to replace in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
  
- (deftest test-substitute-childopts-nil-childopts
-   (testing "worker-launcher has nil childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts nil
-           expected-childopts nil
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
+   (deftest test-substitute-childopts-nil-childopts
+     (testing "worker-launcher has nil childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts nil
+             expected-childopts nil
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
  
- (deftest test-substitute-childopts-nil-ids
-   (testing "worker-launcher has nil ids"
-     (let [worker-id nil
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
+   (deftest test-substitute-childopts-nil-ids
+     (testing "worker-launcher has nil ids"
+       (let [worker-id nil
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
  
- (deftest test-retry-read-assignments
-   (with-simulated-time-local-cluster [cluster
-                                       :supervisors 0
-                                       :ports-per-supervisor 2
-                                       :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
-                                                     NIMBUS-MONITOR-FREQ-SECS 10
-                                                     TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
-                                                     TOPOLOGY-ACKER-EXECUTORS 0}]
-     (letlocals
-      (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
-      (bind topology1 (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
-                       {}))
-      (bind topology2 (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
-                       {}))
-      (bind state (:storm-cluster-state cluster))
-      (bind changed (capture-changed-workers
-                     (submit-mocked-assignment
-                      (:nimbus cluster)
-                      (:storm-cluster-state cluster)
-                      "topology1"
-                      {TOPOLOGY-WORKERS 2}
-                      topology1
-                      {1 "1"
-                       2 "1"}
-                      {[1 1] ["sup1" 1]
-                       [2 2] ["sup1" 2]}
-                      {["sup1" 1] [0.0 0.0 0.0]
-                       ["sup1" 2] [0.0 0.0 0.0]
-                       })
-                     (submit-mocked-assignment
-                      (:nimbus cluster)
-                      (:storm-cluster-state cluster)
-                      "topology2"
-                      {TOPOLOGY-WORKERS 2}
-                      topology2
-                      {1 "1"
-                       2 "1"}
-                      {[1 1] ["sup1" 1]
-                       [2 2] ["sup1" 2]}
-                      {["sup1" 1] [0.0 0.0 0.0]
-                       ["sup1" 2] [0.0 0.0 0.0]
-                       })
-                     ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
-                     (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
-                     ))
-      (is (empty? (:launched changed)))
-      (bind options (RebalanceOptions.))
-      (.set_wait_secs options 0)
-      (bind changed (capture-changed-workers
-                     (.rebalance (:nimbus cluster) "topology2" options)
-                     (advance-cluster-time cluster 10)
-                     (heartbeat-workers cluster "sup1" [1 2 3 4])
-                     (advance-cluster-time cluster 10)
-                     ))
-      (validate-launched-once (:launched changed)
-                              {"sup1" [1 2]}
-                              (get-storm-id (:storm-cluster-state cluster) "topology1"))
-      (validate-launched-once (:launched changed)
-                              {"sup1" [3 4]}
-                              (get-storm-id (:storm-cluster-state cluster) "topology2"))
-      )))
+   (deftest test-retry-read-assignments
+     (with-simulated-time-local-cluster [cluster
+                                         :supervisors 0
+                                         :ports-per-supervisor 2
+                                         :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
+                                                       NIMBUS-MONITOR-FREQ-SECS 10
+                                                       TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+                                                       TOPOLOGY-ACKER-EXECUTORS 0}]
+       (letlocals
+         (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
+         (bind topology1 (thrift/mk-topology
+                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+                           {}))
+         (bind topology2 (thrift/mk-topology
+                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+                           {}))
+         (bind state (:storm-cluster-state cluster))
+         (bind changed (capture-changed-workers
+                         (submit-mocked-assignment
+                           (:nimbus cluster)
+                           (:storm-cluster-state cluster)
+                           "topology1"
+                           {TOPOLOGY-WORKERS 2}
+                           topology1
+                           {1 "1"
+                            2 "1"}
+                           {[1 1] ["sup1" 1]
+                            [2 2] ["sup1" 2]}
+                           {["sup1" 1] [0.0 0.0 0.0]
+                            ["sup1" 2] [0.0 0.0 0.0]
+                            })
+                         (submit-mocked-assignment
+                           (:nimbus cluster)
+                           (:storm-cluster-state cluster)
+                           "topology2"
+                           {TOPOLOGY-WORKERS 2}
+                           topology2
+                           {1 "1"
+                            2 "1"}
+                           {[1 1] ["sup1" 1]
+                            [2 2] ["sup1" 2]}
+                           {["sup1" 1] [0.0 0.0 0.0]
+                            ["sup1" 2] [0.0 0.0 0.0]
+                            })
+                         ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
+                         (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
+                         ))
+         (is (empty? (:launched changed)))
+         (bind options (RebalanceOptions.))
+         (.set_wait_secs options 0)
+         (bind changed (capture-changed-workers
+                         (.rebalance (:nimbus cluster) "topology2" options)
+                         (advance-cluster-time cluster 10)
+                         (heartbeat-workers cluster "sup1" [1 2 3 4])
+                         (advance-cluster-time cluster 10)
+                         ))
+         (validate-launched-once (:launched changed)
+           {"sup1" [1 2]}
+           (get-storm-id (:storm-cluster-state cluster) "topology1"))
+         (validate-launched-once (:launched changed)
+           {"sup1" [3 4]}
+           (get-storm-id (:storm-cluster-state cluster) "topology2"))
 -        ))))
++        )))