You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/08/24 15:52:24 UTC
[39/50] [abbrv] storm git commit: Merge remote-tracking branch
'apache/nimbus-ha-branch' into ha-merge
Merge remote-tracking branch 'apache/nimbus-ha-branch' into ha-merge
Conflicts:
STORM-UI-REST-API.md
conf/defaults.yaml
storm-core/src/clj/backtype/storm/daemon/nimbus.clj
storm-core/src/clj/backtype/storm/ui/core.clj
storm-core/src/jvm/backtype/storm/Config.java
storm-core/src/jvm/backtype/storm/generated/TopologySummary.java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d1afefde
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d1afefde
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d1afefde
Branch: refs/heads/master
Commit: d1afefde51e34dc993591ad79d3fe217bef86f87
Parents: c54cea1 765e4c2
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Aug 11 22:31:02 2015 -0700
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Tue Aug 11 22:31:02 2015 -0700
----------------------------------------------------------------------
STORM-UI-REST-API.md | 40 +-
conf/defaults.yaml | 7 +-
.../nimbus_ha_leader_election_and_failover.png | Bin 0 -> 154316 bytes
.../images/nimbus_ha_topology_submission.png | Bin 0 -> 134180 bytes
docs/documentation/nimbus-ha-design.md | 217 +++++
.../ha/codedistributor/HDFSCodeDistributor.java | 101 +++
pom.xml | 16 +
storm-core/pom.xml | 16 +
storm-core/src/clj/backtype/storm/cluster.clj | 57 +-
.../backtype/storm/command/shell_submission.clj | 9 +-
storm-core/src/clj/backtype/storm/config.clj | 15 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 243 ++++--
.../clj/backtype/storm/daemon/supervisor.clj | 53 +-
storm-core/src/clj/backtype/storm/thrift.clj | 23 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 72 +-
storm-core/src/clj/backtype/storm/zookeeper.clj | 94 ++-
storm-core/src/jvm/backtype/storm/Config.java | 42 +-
.../storm/codedistributor/ICodeDistributor.java | 56 ++
.../LocalFileSystemCodeDistributor.java | 106 +++
.../storm/generated/ClusterSummary.java | 292 ++++---
.../backtype/storm/generated/NimbusSummary.java | 796 +++++++++++++++++++
.../backtype/storm/generated/TopologyInfo.java | 221 +++--
.../storm/generated/TopologySummary.java | 107 ++-
.../backtype/storm/nimbus/ILeaderElector.java | 60 ++
.../jvm/backtype/storm/nimbus/NimbusInfo.java | 93 +++
.../jvm/backtype/storm/utils/NimbusClient.java | 63 +-
.../src/jvm/backtype/storm/utils/Utils.java | 9 +
storm-core/src/py/storm/ttypes.py | 613 ++++++++------
storm-core/src/storm.thrift | 12 +-
storm-core/src/ui/public/index.html | 21 +
.../public/templates/index-page-template.html | 58 +-
.../templates/topology-page-template.html | 6 +
.../test/clj/backtype/storm/cluster_test.clj | 23 +-
.../test/clj/backtype/storm/nimbus_test.clj | 210 +++--
.../backtype/storm/security/auth/auth_test.clj | 4 +-
.../storm/security/auth/nimbus_auth_test.clj | 14 +-
.../test/clj/backtype/storm/supervisor_test.clj | 1 +
.../test/clj/backtype/storm/utils_test.clj | 12 -
38 files changed, 3124 insertions(+), 658 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/STORM-UI-REST-API.md
----------------------------------------------------------------------
diff --cc STORM-UI-REST-API.md
index 2836105,baaca84..35ba6ed
--- a/STORM-UI-REST-API.md
+++ b/STORM-UI-REST-API.md
@@@ -231,7 -263,13 +263,8 @@@ Response fields
|bolts.errorLapsedSecs| Integer |Number of seconds elapsed since that last error happened in a bolt|
|bolts.errorWorkerLogLink| String | Link to the worker log that reported the exception |
|bolts.emitted| Long |Number of tuples emitted|
-|antiForgeryToken| String | CSRF token|
+ |replicationCount| Integer |Number of nimbus hosts on which this topology code is replicated|
-Caution: users need to unescape the antiForgeryToken value before using this token to make POST calls(simple-json escapes forward slashes)
-[ISSUE-8](https://code.google.com/p/json-simple/issues/detail?id=8)
-
-
Examples:
```no-highlight
@@@ -375,7 -413,9 +408,8 @@@ Sample response
"storm.zookeeper.retry.intervalceiling.millis": 30000,
"supervisor.enable": true,
"storm.messaging.netty.server_worker_threads": 1
- }
+ },
- "antiForgeryToken": "lAFTN\/5iSedRLwJeUNqkJ8hgYubRl2OxjXGoDf9A4Bt1nZY3rvJW0\/P4zqu9yAk\/LvDhlmn7gigw\/z8C",
+ "replicationCount": 1
}
```
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/conf/defaults.yaml
----------------------------------------------------------------------
diff --cc conf/defaults.yaml
index c3fa372,49584f2..dd69eb6
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@@ -47,10 -47,12 +47,11 @@@ storm.auth.simple-white-list.users: [
storm.auth.simple-acl.users: []
storm.auth.simple-acl.users.commands: []
storm.auth.simple-acl.admins: []
-storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate"
+storm.meta.serialization.delegate: "backtype.storm.serialization.GzipThriftSerializationDelegate"
+ storm.codedistributor.class: "backtype.storm.codedistributor.LocalFileSystemCodeDistributor"
-storm.meta.serialization.delegate: "backtype.storm.serialization.ThriftSerializationDelegate"
### nimbus.* configs are for the master
- nimbus.host: "localhost"
+ nimbus.seeds : ["localhost:6627"]
nimbus.thrift.port: 6627
nimbus.thrift.threads: 64
nimbus.thrift.max_buffer_size: 1048576
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/cluster.clj
index 63e385f,333feec..f75648a
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@@ -284,11 -311,9 +311,12 @@@
(condp = subtree
ASSIGNMENTS-ROOT (if (empty? args)
(issue-callback! assignments-callback)
- (issue-map-callback! assignment-info-callback (first args)))
+ (do
+ (issue-map-callback! assignment-info-callback (first args))
+ (issue-map-callback! assignment-version-callback (first args))
+ (issue-map-callback! assignment-info-with-version-callback (first args))))
SUPERVISORS-ROOT (issue-callback! supervisors-callback)
+ CODE-DISTRIBUTOR-ROOT (issue-callback! code-distributor-callback)
STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first args))
;; this should never happen
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index c88e36b,8a2c0fb..35154d3
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@@ -327,9 -341,32 +345,32 @@@
(FileUtils/cleanDirectory (File. stormroot))
(setup-jar conf tmp-jar-location stormroot)
(FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology))
- (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/javaSerialize storm-conf))
+ (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/toCompressedJsonConf storm-conf))
+ (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) stormroot storm-id))
))
+ (defn- wait-for-desired-code-replication [nimbus conf storm-id]
+ (let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
+ max-replication-wait-time (conf TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC)
+ total-wait-time (atom 0)
+ current-replication-count (atom (if (:code-distributor nimbus) (.getReplicationCount (:code-distributor nimbus) storm-id) 0))]
+ (if (:code-distributor nimbus)
+ (while (and (> min-replication-count @current-replication-count)
+ (or (= -1 max-replication-wait-time)
+ (< @total-wait-time max-replication-wait-time)))
+ (sleep-secs 1)
+ (log-debug "waiting for desired replication to be achieved.
+ min-replication-count = " min-replication-count " max-replication-wait-time = " max-replication-wait-time
+ "current-replication-count = " @current-replication-count " total-wait-time " @total-wait-time)
+ (swap! total-wait-time inc)
+ (reset! current-replication-count (.getReplicationCount (:code-distributor nimbus) storm-id))))
+ (if (< min-replication-count @current-replication-count)
+ (log-message "desired replication count " min-replication-count " achieved,
+ current-replication-count" @current-replication-count)
+ (log-message "desired replication count of " min-replication-count " not achieved but we have hit the max wait time
+ so moving on with replication count = " @current-replication-count)
+ )))
+
(defn- read-storm-topology [conf storm-id]
(let [stormroot (master-stormdist-root conf storm-id)]
(Utils/deserialize
@@@ -1258,12 -1323,18 +1330,20 @@@
(:uptime-secs info)
(count ports)
(count (:used-ports info))
- id )
+ id) ]
+ (when-let [version (:version info)] (.set_version sup-sum version))
+ sup-sum
))
- nimbus-uptime ((:uptime nimbus))
bases (topology-bases storm-cluster-state)
+ nimbuses (.nimbuses storm-cluster-state)
+
+ ;;update the isLeader field for each nimbus summary
+ _ (let [leader (.getLeader (:leader-elector nimbus))
+ leader-host (.getHost leader)
+ leader-port (.getPort leader)]
+ (doseq [nimbus-summary nimbuses]
+ (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
+
topology-summaries (dofor [[id base] bases :when base]
(let [assignment (.assignment-info storm-cluster-state id nil)
topo-summ (TopologySummary. id
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index be8f682,4fc219e..5f819bd
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@@ -37,7 -37,11 +37,8 @@@
(defmulti download-storm-code cluster-mode)
(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))
+ (defmulti mk-code-distributor cluster-mode)
-;; used as part of a map from port to this
-(defrecord LocalAssignment [storm-id executors])
-
(defprotocol SupervisorDaemon
(get-id [this])
(get-conf [this])
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/ui/core.clj
index b0e266c,02c3d90..950b88d
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@@ -25,8 -25,10 +25,9 @@@
(:use [backtype.storm.ui helpers])
(:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
ACKER-FAIL-STREAM-ID system-id? mk-authorization-handler]]])
- (:use [ring.middleware.anti-forgery])
(:use [clojure.string :only [blank? lower-case trim]])
- (:import [backtype.storm.utils Utils])
+ (:import [backtype.storm.utils Utils]
+ [backtype.storm.generated NimbusSummary])
(:import [backtype.storm.generated ExecutorSpecificStats
ExecutorStats ExecutorSummary TopologyInfo SpoutStats BoltStats
ErrorInfo ClusterSummary SupervisorSummary TopologySummary
@@@ -293,18 -287,14 +286,21 @@@
(bolt-comp-summs id))]
(sort-by #(-> ^ExecutorSummary % .get_executor_info .get_task_start) ret)))
-(defn worker-log-link [host port topology-id]
+(defn worker-log-link [host port topology-id secure?]
(let [fname (logs-filename topology-id port)]
- (url-format (str "http://%s:%s/log?file=%s")
- host (*STORM-CONF* LOGVIEWER-PORT) fname)))
+ (if (and secure? (*STORM-CONF* LOGVIEWER-HTTPS-PORT))
+ (url-format "https://%s:%s/log?file=%s"
+ host
+ (*STORM-CONF* LOGVIEWER-HTTPS-PORT)
+ fname)
+ (url-format "http://%s:%s/log?file=%s"
+ host
+ (*STORM-CONF* LOGVIEWER-PORT)
+ fname))))
+ (defn nimbus-log-link [host port]
+ (url-format "http://%s:%s/log?file=nimbus.log" host (*STORM-CONF* LOGVIEWER-PORT) port))
+
(defn compute-executor-capacity
[^ExecutorSummary e]
(let [stats (.get_stats e)
@@@ -705,8 -711,8 +718,8 @@@
"acked" (get-in stats [:acked k])
"failed" (get-in stats [:failed k])})))
-(defn topology-page [id window include-sys? user]
+(defn topology-page [id window include-sys? user secure?]
- (with-nimbus nimbus
+ (thrift/with-configured-nimbus-connection nimbus
(let [window (if window window ":all-time")
window-hint (window-hint window)
summ (->> (doto
@@@ -738,10 -745,12 +752,11 @@@
"windowHint" window-hint
"msgTimeout" msg-timeout
"topologyStats" (topology-stats id window (total-aggregate-stats spout-summs bolt-summs include-sys?))
- "spouts" (spout-comp id spout-comp-summs (.get_errors summ) window include-sys?)
- "bolts" (bolt-comp id bolt-comp-summs (.get_errors summ) window include-sys?)
+ "spouts" (spout-comp id spout-comp-summs (.get_errors summ) window include-sys? secure?)
+ "bolts" (bolt-comp id bolt-comp-summs (.get_errors summ) window include-sys? secure?)
"configuration" topology-conf
- "visualizationTable" (stream-boxes visualizer-data)}))))
+ "visualizationTable" (stream-boxes visualizer-data)
- "antiForgeryToken" *anti-forgery-token*
+ "replicationCount" replication-count}))))
(defn spout-output-stats
[stream-summary window]
@@@ -885,11 -894,11 +900,11 @@@
"inputStats" (bolt-input-stats stream-summary window)
"outputStats" (bolt-output-stats stream-summary window)
"executorStats" (bolt-executor-stats
- (.get_id topology-info) executors window include-sys?)}))
+ (.get_id topology-info) executors window include-sys? secure?)}))
(defn component-page
- [topology-id component window include-sys? user]
+ [topology-id component window include-sys? user secure?]
- (with-nimbus nimbus
+ (thrift/with-configured-nimbus-connection nimbus
(let [window (if window window ":all-time")
summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id)
topology (.getTopology ^Nimbus$Client nimbus topology-id)
@@@ -935,7 -944,7 +950,7 @@@
{:status status
:headers (merge {"Cache-Control" "no-cache, no-store"
"Access-Control-Allow-Origin" "*"
-- "Access-Control-Allow-Headers" "Content-Type, Access-Control-Allow-Headers, Access-Controler-Allow-Origin, X-Requested-By, X-Csrf-Token, Authorization, X-Requested-With"}
++ "Access-Control-Allow-Headers" "Content-Type, Access-Control-Allow-Headers, Access-Controler-Allow-Origin, X-Requested-By, Authorization, X-Requested-With"}
(if (not-nil? callback) {"Content-Type" "application/javascript;charset=utf-8"}
{"Content-Type" "application/json;charset=utf-8"}))
:body (if (not-nil? callback)
@@@ -965,15 -977,15 +983,15 @@@
(GET "/api/v1/topology/:id/visualization" [:as {:keys [cookies servlet-request]} id & m]
(assert-authorized-user servlet-request "getTopology" (topology-config id))
(json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys m))) (:callback m)))
- (GET "/api/v1/topology/:id/component/:component" [:as {:keys [cookies servlet-request]} id component & m]
+ (GET "/api/v1/topology/:id/component/:component" [:as {:keys [cookies servlet-request scheme]} id component & m]
(let [user (.getUserName http-creds-handler servlet-request)]
(assert-authorized-user servlet-request "getTopology" (topology-config id))
- (json-response (component-page id component (:window m) (check-include-sys? (:sys m)) user) (:callback m))))
- (GET "/api/v1/token" [ & m]
- (json-response (format "{\"antiForgeryToken\": \"%s\"}" *anti-forgery-token*) (:callback m) :serialize-fn identity))
+ (json-response
+ (component-page id component (:window m) (check-include-sys? (:sys m)) user (= scheme :https))
+ (:callback m))))
(POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id & m]
+ (thrift/with-configured-nimbus-connection nimbus
(assert-authorized-user servlet-request "activate" (topology-config id))
- (with-nimbus nimbus
(let [tplg (->> (doto
(GetInfoOptions.)
(.set_num_err_choice NumErrorsChoice/NONE))
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/clj/backtype/storm/zookeeper.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/Config.java
index 4628bd4,bd145d5..3cba37c
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@@ -1454,12 -1293,35 +1454,42 @@@ public class Config extends HashMap<Str
public static final Object TOPOLOGY_ISOLATED_MACHINES_SCHEMA = Number.class;
/**
+ * Configure timeout milliseconds used for disruptor queue wait strategy. Can be used to tradeoff latency
+ * vs. CPU usage
+ */
+ public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis";
+ public static final Object TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS_SCHEMA = ConfigValidation.NotNullPosIntegerValidator;
+
++ /**
+ * Which implementation of {@link backtype.storm.codedistributor.ICodeDistributor} should be used by storm for code
+ * distribution.
+ */
+ public static final String STORM_CODE_DISTRIBUTOR_CLASS = "storm.codedistributor.class";
+ public static final Object STORM_CODE_DISTRIBUTOR_CLASS_SCHEMA = String.class;
+
+ /**
+ * Minimum number of nimbus hosts where the code must be replicated before leader nimbus
+ * is allowed to perform topology activation tasks like setting up heartbeats/assignments
+ * and marking the topology as active. default is 0.
+ */
+ public static final String TOPOLOGY_MIN_REPLICATION_COUNT = "topology.min.replication.count";
+ public static final Object TOPOLOGY_MIN_REPLICATION_COUNT_SCHEMA = Number.class;
+
+ /**
+ * Maximum wait time for the nimbus host replication to achieve the nimbus.min.replication.count.
+ * Once this time is elapsed nimbus will go ahead and perform topology activation tasks even
+ * if required nimbus.min.replication.count is not achieved. The default is 0 seconds, a value of
+ * -1 indicates to wait for ever.
+ */
- public static final String TOPOLOGY_MAX_REPLICATION_WAIT_TIME_SEC = "nimbus.max.replication.wait.time.sec";
++ public static final String TOPOLOGY_MAX_REPLICATION_WAIT_TIME_SEC = "topology.max.replication.wait.time.sec";
+ public static final Object TOPOLOGY_MAX_REPLICATION_WAIT_TIME_SEC_SCHEMA = Number.class;
+
+ /**
+ * How often nimbus's background thread to sync code for missing topologies should run.
+ */
+ public static final String NIMBUS_CODE_SYNC_FREQ_SECS = "nimbus.code.sync.freq.secs";
+ public static final Object NIMBUS_CODE_SYNC_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator;
+
public static void setClasspath(Map conf, String cp) {
conf.put(Config.TOPOLOGY_CLASSPATH, cp);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/jvm/backtype/storm/generated/TopologySummary.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --cc storm-core/src/storm.thrift
index a4b0b2a,839f6da..a585924
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@@ -153,13 -154,20 +154,21 @@@ struct SupervisorSummary
3: required i32 num_workers;
4: required i32 num_used_workers;
5: required string supervisor_id;
+ 6: optional string version = "VERSION_NOT_PROVIDED";
}
+ struct NimbusSummary {
+ 1: required string host;
+ 2: required i32 port;
+ 3: required i32 uptime_secs;
+ 4: required bool isLeader;
+ 5: required string version;
+ }
+
struct ClusterSummary {
1: required list<SupervisorSummary> supervisors;
- 2: required i32 nimbus_uptime_secs;
3: required list<TopologySummary> topologies;
+ 4: required list<NimbusSummary> nimbuses;
}
struct ErrorInfo {
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/ui/public/index.html
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/ui/public/templates/index-page-template.html
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/ui/public/templates/topology-page-template.html
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/test/clj/backtype/storm/cluster_test.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/test/clj/backtype/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/nimbus_test.clj
index 00fb6d6,057dd30..cbd88c4
--- a/storm-core/test/clj/backtype/storm/nimbus_test.clj
+++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj
@@@ -1180,32 -1277,3 +1277,33 @@@
(is (thrown-cause? InvalidTopologyException
(submit-local-topology-with-opts nimbus "test" bad-config topology
(SubmitOptions.))))))))
+
+(deftest test-stateless-with-scheduled-topology-to-be-killed
+ ; tests regression of STORM-856
+ (with-inprocess-zookeeper zk-port
+ (with-local-tmp [nimbus-dir]
+ (letlocals
+ (bind conf (merge (read-storm-config)
+ {STORM-ZOOKEEPER-SERVERS ["localhost"]
+ STORM-CLUSTER-MODE "local"
+ STORM-ZOOKEEPER-PORT zk-port
+ STORM-LOCAL-DIR nimbus-dir}))
+ (bind cluster-state (cluster/mk-storm-cluster-state conf))
+ (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
++ (sleep-secs 1)
+ (bind topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
+ {}))
+ (submit-local-topology nimbus "t1" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology)
+ ; make transition for topology t1 to be killed -> nimbus applies this event to cluster state
+ (.killTopology nimbus "t1")
+ ; shutdown nimbus immediately to achieve nimbus doesn't handle event right now
+ (.shutdown nimbus)
+
+ ; in startup of nimbus it reads cluster state and take proper actions
+ ; in this case nimbus registers topology transition event to scheduler again
+ ; before applying STORM-856 nimbus was killed with NPE
+ (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
+ (.shutdown nimbus)
+ (.disconnect cluster-state)
+ ))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------