You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:37 UTC

[14/35] storm git commit: Merge branch 'master' into supervisor

Merge branch 'master' into supervisor


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/69c8b3c3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/69c8b3c3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/69c8b3c3

Branch: refs/heads/master
Commit: 69c8b3c31d4ee528aea58f716b092c24ba6b0b1a
Parents: f78c36d 6390d18
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Thu Mar 10 23:26:42 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Thu Mar 10 23:26:42 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   1 +
 .../src/clj/org/apache/storm/converter.clj      |  15 +
 .../src/clj/org/apache/storm/daemon/common.clj  | 350 +-----------
 .../clj/org/apache/storm/daemon/executor.clj    |  24 +-
 .../clj/org/apache/storm/daemon/logviewer.clj   |   2 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  63 +--
 .../src/clj/org/apache/storm/daemon/task.clj    |   5 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  24 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  98 ++--
 storm-core/src/clj/org/apache/storm/ui/core.clj |  15 +-
 .../org/apache/storm/daemon/DaemonCommon.java   |  22 +
 .../org/apache/storm/daemon/StormCommon.java    | 537 +++++++++++++++++++
 .../storm/utils/StormCommonInstaller.java       |  43 ++
 .../src/jvm/org/apache/storm/utils/Utils.java   |  16 +
 .../org/apache/storm/integration_test.clj       |   6 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   | 121 +++--
 .../apache/storm/security/auth/auth_test.clj    |   3 +-
 .../clj/org/apache/storm/supervisor_test.clj    |  11 +-
 18 files changed, 833 insertions(+), 523 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/69c8b3c3/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/69c8b3c3/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/testing.clj
index dd1f2df,bda09ee..4cec39a
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@@ -290,13 -286,13 +291,13 @@@
    ([cluster-map timeout-ms]
    ;; wait until all workers, supervisors, and nimbus is waiting
    (let [supervisors @(:supervisors cluster-map)
-         workers (filter (partial satisfies? common/DaemonCommon) (clojurify-structure (ProcessSimulator/getAllProcessHandles)))
+         workers (filter (partial instance? DaemonCommon) (clojurify-structure (ProcessSimulator/getAllProcessHandles)))
          daemons (concat
                    [(:nimbus cluster-map)]
 -                  supervisors
                    ; because a worker may already be dead
                    workers)]
 -    (while-timeout timeout-ms (not (every? (memfn isWaiting) daemons))
 +    (while-timeout timeout-ms (or (not (every? (memfn waiting?) daemons))
 +                                (not (every? is-supervisor-waiting supervisors)))
                     (Thread/sleep (rand-int 20))
                     ;;      (doseq [d daemons]
                     ;;        (if-not ((memfn waiting?) d)
@@@ -377,26 -373,27 +378,28 @@@
  
  (defn submit-mocked-assignment
    [nimbus storm-cluster-state storm-name conf topology task->component executor->node+port worker->resources]
-   (with-var-roots [common/storm-task-info (fn [& ignored] task->component)
-                    nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments)
-                    nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources
-                                                           storm-cluster-state
-                                                           storm-name
-                                                           worker->resources)
-                    nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port
-                                                                       storm-cluster-state
-                                                                       storm-name
-                                                                       executor->node+port)]
-     (submit-local-topology nimbus storm-name conf topology)))
+   (let [fake-common (proxy [StormCommon] []
+                       (stormTaskInfoImpl [_] task->component))]
+     (with-open [- (StormCommonInstaller. fake-common)]
+       (with-var-roots [nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments)
+                        nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources
+                                                               storm-cluster-state
+                                                               storm-name
+                                                               worker->resources)
+                        nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port
+                                                                           storm-cluster-state
+                                                                           storm-name
+                                                                           executor->node+port)]
+         (submit-local-topology nimbus storm-name conf topology)))))
  
  (defn mk-capture-launch-fn [capture-atom]
 -  (fn [supervisor storm-id port worker-id mem-onheap]
 -    (let [supervisor-id (:supervisor-id supervisor)
 -          conf (:conf supervisor)
 -          existing (get @capture-atom [supervisor-id port] [])]
 -      (ConfigUtils/setWorkerUserWSE conf worker-id "")
 -      (swap! capture-atom assoc [supervisor-id port] (conj existing storm-id)))))
 +  (fn [supervisorData stormId port workerId resources]
 +    (let [conf (.getConf supervisorData)
 +          supervisorId (.getSupervisorId supervisorData)
 +          existing (get @capture-atom [supervisorId port] [])]
 +      (log-message "mk-capture-launch-fn")
 +      (ConfigUtils/setWorkerUserWSE conf workerId "")
 +      (swap! capture-atom assoc [supervisorId port] (conj existing stormId)))))
  
  (defn find-worker-id
    [supervisor-conf port]

http://git-wip-us.apache.org/repos/asf/storm/blob/69c8b3c3/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/69c8b3c3/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 2ff21ac,ade1c2f..d3d7344
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -37,9 -34,10 +37,10 @@@
    (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils]
             [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller])
    (:import [java.nio.file.attribute FileAttribute])
+   (:import [org.apache.storm.daemon StormCommon])
    (:use [org.apache.storm config testing util log converter])
    (:use [org.apache.storm.daemon common])
 -  (:require [org.apache.storm.daemon [worker :as worker] [supervisor :as supervisor]])
 +  (:require [org.apache.storm.daemon [worker :as worker] [local-supervisor :as local-supervisor]])
    (:use [conjure core])
    (:require [clojure.java.io :as io]))
  
@@@ -871,8 -840,9 +872,8 @@@
                          ))
          (validate-launched-once (:launched changed)
            {"sup1" [1 2]}
-           (get-storm-id (:storm-cluster-state cluster) "topology1"))
+           (StormCommon/getStormId (:storm-cluster-state cluster) "topology1"))
          (validate-launched-once (:launched changed)
            {"sup1" [3 4]}
-           (get-storm-id (:storm-cluster-state cluster) "topology2"))
+           (StormCommon/getStormId (:storm-cluster-state cluster) "topology2"))
          )))
 -