You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:37 UTC
[14/35] storm git commit: Merge branch 'master' into supervisor
Merge branch 'master' into supervisor
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/69c8b3c3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/69c8b3c3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/69c8b3c3
Branch: refs/heads/master
Commit: 69c8b3c31d4ee528aea58f716b092c24ba6b0b1a
Parents: f78c36d 6390d18
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Thu Mar 10 23:26:42 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Thu Mar 10 23:26:42 2016 +0800
----------------------------------------------------------------------
CHANGELOG.md | 1 +
.../src/clj/org/apache/storm/converter.clj | 15 +
.../src/clj/org/apache/storm/daemon/common.clj | 350 +-----------
.../clj/org/apache/storm/daemon/executor.clj | 24 +-
.../clj/org/apache/storm/daemon/logviewer.clj | 2 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 63 +--
.../src/clj/org/apache/storm/daemon/task.clj | 5 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 24 +-
storm-core/src/clj/org/apache/storm/testing.clj | 98 ++--
storm-core/src/clj/org/apache/storm/ui/core.clj | 15 +-
.../org/apache/storm/daemon/DaemonCommon.java | 22 +
.../org/apache/storm/daemon/StormCommon.java | 537 +++++++++++++++++++
.../storm/utils/StormCommonInstaller.java | 43 ++
.../src/jvm/org/apache/storm/utils/Utils.java | 16 +
.../org/apache/storm/integration_test.clj | 6 +-
.../test/clj/org/apache/storm/nimbus_test.clj | 121 +++--
.../apache/storm/security/auth/auth_test.clj | 3 +-
.../clj/org/apache/storm/supervisor_test.clj | 11 +-
18 files changed, 833 insertions(+), 523 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/69c8b3c3/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/69c8b3c3/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/testing.clj
index dd1f2df,bda09ee..4cec39a
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@@ -290,13 -286,13 +291,13 @@@
([cluster-map timeout-ms]
;; wait until all workers, supervisors, and nimbus is waiting
(let [supervisors @(:supervisors cluster-map)
- workers (filter (partial satisfies? common/DaemonCommon) (clojurify-structure (ProcessSimulator/getAllProcessHandles)))
+ workers (filter (partial instance? DaemonCommon) (clojurify-structure (ProcessSimulator/getAllProcessHandles)))
daemons (concat
[(:nimbus cluster-map)]
- supervisors
; because a worker may already be dead
workers)]
- (while-timeout timeout-ms (not (every? (memfn isWaiting) daemons))
+ (while-timeout timeout-ms (or (not (every? (memfn waiting?) daemons))
+ (not (every? is-supervisor-waiting supervisors)))
(Thread/sleep (rand-int 20))
;; (doseq [d daemons]
;; (if-not ((memfn waiting?) d)
@@@ -377,26 -373,27 +378,28 @@@
(defn submit-mocked-assignment
[nimbus storm-cluster-state storm-name conf topology task->component executor->node+port worker->resources]
- (with-var-roots [common/storm-task-info (fn [& ignored] task->component)
- nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments)
- nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources
- storm-cluster-state
- storm-name
- worker->resources)
- nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port
- storm-cluster-state
- storm-name
- executor->node+port)]
- (submit-local-topology nimbus storm-name conf topology)))
+ (let [fake-common (proxy [StormCommon] []
+ (stormTaskInfoImpl [_] task->component))]
+ (with-open [- (StormCommonInstaller. fake-common)]
+ (with-var-roots [nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments)
+ nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources
+ storm-cluster-state
+ storm-name
+ worker->resources)
+ nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port
+ storm-cluster-state
+ storm-name
+ executor->node+port)]
+ (submit-local-topology nimbus storm-name conf topology)))))
(defn mk-capture-launch-fn [capture-atom]
- (fn [supervisor storm-id port worker-id mem-onheap]
- (let [supervisor-id (:supervisor-id supervisor)
- conf (:conf supervisor)
- existing (get @capture-atom [supervisor-id port] [])]
- (ConfigUtils/setWorkerUserWSE conf worker-id "")
- (swap! capture-atom assoc [supervisor-id port] (conj existing storm-id)))))
+ (fn [supervisorData stormId port workerId resources]
+ (let [conf (.getConf supervisorData)
+ supervisorId (.getSupervisorId supervisorData)
+ existing (get @capture-atom [supervisorId port] [])]
+ (log-message "mk-capture-launch-fn")
+ (ConfigUtils/setWorkerUserWSE conf workerId "")
+ (swap! capture-atom assoc [supervisorId port] (conj existing stormId)))))
(defn find-worker-id
[supervisor-conf port]
http://git-wip-us.apache.org/repos/asf/storm/blob/69c8b3c3/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/69c8b3c3/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 2ff21ac,ade1c2f..d3d7344
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -37,9 -34,10 +37,10 @@@
(:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils]
[org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller])
(:import [java.nio.file.attribute FileAttribute])
+ (:import [org.apache.storm.daemon StormCommon])
(:use [org.apache.storm config testing util log converter])
(:use [org.apache.storm.daemon common])
- (:require [org.apache.storm.daemon [worker :as worker] [supervisor :as supervisor]])
+ (:require [org.apache.storm.daemon [worker :as worker] [local-supervisor :as local-supervisor]])
(:use [conjure core])
(:require [clojure.java.io :as io]))
@@@ -871,8 -840,9 +872,8 @@@
))
(validate-launched-once (:launched changed)
{"sup1" [1 2]}
- (get-storm-id (:storm-cluster-state cluster) "topology1"))
+ (StormCommon/getStormId (:storm-cluster-state cluster) "topology1"))
(validate-launched-once (:launched changed)
{"sup1" [3 4]}
- (get-storm-id (:storm-cluster-state cluster) "topology2"))
+ (StormCommon/getStormId (:storm-cluster-state cluster) "topology2"))
)))
-