You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:18 UTC

[22/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form of backwards compatability

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj b/storm-core/src/clj/org/apache/storm/cluster.clj
new file mode 100644
index 0000000..5aef266
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/cluster.clj
@@ -0,0 +1,691 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.cluster
+  (:import [org.apache.zookeeper.data Stat ACL Id]
+           [org.apache.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary
+            LogConfig ProfileAction ProfileRequest NodeInfo]
+           [java.io Serializable])
+  (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
+  (:import [org.apache.curator.framework CuratorFramework])
+  (:import [org.apache.storm.utils Utils])
+  (:import [org.apache.storm.cluster ClusterState ClusterStateContext ClusterStateListener ConnectionState])
+  (:import [java.security MessageDigest])
+  (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider])
+  (:import [org.apache.storm.nimbus NimbusInfo])
+  (:use [org.apache.storm util log config converter])
+  (:require [org.apache.storm [zookeeper :as zk]])
+  (:require [org.apache.storm.daemon [common :as common]]))
+
+(defn mk-topo-only-acls
+  [topo-conf]
+  (let [payload (.get topo-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)]
+    (when (Utils/isZkAuthenticationConfiguredTopology topo-conf)
+      [(first ZooDefs$Ids/CREATOR_ALL_ACL)
+       (ACL. ZooDefs$Perms/READ (Id. "digest" (DigestAuthenticationProvider/generateDigest payload)))])))
+ 
+(defnk mk-distributed-cluster-state
+  [conf :auth-conf nil :acls nil :context (ClusterStateContext.)]
+  (let [clazz (Class/forName (or (conf STORM-CLUSTER-STATE-STORE)
+                                 "org.apache.storm.cluster_state.zookeeper_state_factory"))
+        state-instance (.newInstance clazz)]
+    (log-debug "Creating cluster state: " (.toString clazz))
+    (or (.mkState state-instance conf auth-conf acls context)
+        nil)))
+
+(defprotocol StormClusterState
+  (assignments [this callback])
+  (assignment-info [this storm-id callback])
+  (assignment-info-with-version [this storm-id callback])
+  (assignment-version [this storm-id callback])
+  ;returns key information under /storm/blobstore/key
+  (blobstore-info [this blob-key])
+  ;returns list of nimbus summaries stored under /stormroot/nimbuses/<nimbus-ids> -> <data>
+  (nimbuses [this])
+  ;adds the NimbusSummary to /stormroot/nimbuses/nimbus-id
+  (add-nimbus-host! [this nimbus-id nimbus-summary])
+
+  (active-storms [this])
+  (storm-base [this storm-id callback])
+  (get-worker-heartbeat [this storm-id node port])
+  (get-worker-profile-requests [this storm-id nodeinfo thrift?])
+  (get-topology-profile-requests [this storm-id thrift?])
+  (set-worker-profile-request [this storm-id profile-request])
+  (delete-topology-profile-requests [this storm-id profile-request])
+  (executor-beats [this storm-id executor->node+port])
+  (supervisors [this callback])
+  (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
+  (setup-heartbeats! [this storm-id])
+  (teardown-heartbeats! [this storm-id])
+  (teardown-topology-errors! [this storm-id])
+  (heartbeat-storms [this])
+  (error-topologies [this])
+  (set-topology-log-config! [this storm-id log-config])
+  (topology-log-config [this storm-id cb])
+  (worker-heartbeat! [this storm-id node port info])
+  (remove-worker-heartbeat! [this storm-id node port])
+  (supervisor-heartbeat! [this supervisor-id info])
+  (worker-backpressure! [this storm-id node port info])
+  (topology-backpressure [this storm-id callback])
+  (setup-backpressure! [this storm-id])
+  (remove-worker-backpressure! [this storm-id node port])
+  (activate-storm! [this storm-id storm-base])
+  (update-storm! [this storm-id new-elems])
+  (remove-storm-base! [this storm-id])
+  (set-assignment! [this storm-id info])
+  ;; sets up information related to key consisting of nimbus
+  ;; host:port and version info of the blob
+  (setup-blobstore! [this key nimbusInfo versionInfo])
+  (active-keys [this])
+  (blobstore [this callback])
+  (remove-storm! [this storm-id])
+  (remove-blobstore-key! [this blob-key])
+  (remove-key-version! [this blob-key])
+  (report-error [this storm-id component-id node port error])
+  (errors [this storm-id component-id])
+  (last-error [this storm-id component-id])
+  (set-credentials! [this storm-id creds topo-conf])
+  (credentials [this storm-id callback])
+  (disconnect [this]))
+
+(def ASSIGNMENTS-ROOT "assignments")
+(def CODE-ROOT "code")
+(def STORMS-ROOT "storms")
+(def SUPERVISORS-ROOT "supervisors")
+(def WORKERBEATS-ROOT "workerbeats")
+(def BACKPRESSURE-ROOT "backpressure")
+(def ERRORS-ROOT "errors")
+(def BLOBSTORE-ROOT "blobstore")
+; Stores the latest update sequence for a blob
+(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT "blobstoremaxkeysequencenumber")
+(def NIMBUSES-ROOT "nimbuses")
+(def CREDENTIALS-ROOT "credentials")
+(def LOGCONFIG-ROOT "logconfigs")
+(def PROFILERCONFIG-ROOT "profilerconfigs")
+
+(def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT))
+(def STORMS-SUBTREE (str "/" STORMS-ROOT))
+(def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT))
+(def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
+(def BACKPRESSURE-SUBTREE (str "/" BACKPRESSURE-ROOT))
+(def ERRORS-SUBTREE (str "/" ERRORS-ROOT))
+;; Blobstore subtree /storm/blobstore
+(def BLOBSTORE-SUBTREE (str "/" BLOBSTORE-ROOT))
+(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE (str "/" BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT))
+(def NIMBUSES-SUBTREE (str "/" NIMBUSES-ROOT))
+(def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT))
+(def LOGCONFIG-SUBTREE (str "/" LOGCONFIG-ROOT))
+(def PROFILERCONFIG-SUBTREE (str "/" PROFILERCONFIG-ROOT))
+
+(defn supervisor-path
+  [id]
+  (str SUPERVISORS-SUBTREE "/" id))
+
+(defn assignment-path
+  [id]
+  (str ASSIGNMENTS-SUBTREE "/" id))
+
+(defn blobstore-path
+  [key]
+  (str BLOBSTORE-SUBTREE "/" key))
+
+(defn blobstore-max-key-sequence-number-path
+  [key]
+  (str BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE "/" key))
+
+(defn nimbus-path
+  [id]
+  (str NIMBUSES-SUBTREE "/" id))
+
+(defn storm-path
+  [id]
+  (str STORMS-SUBTREE "/" id))
+
+(defn workerbeat-storm-root
+  [storm-id]
+  (str WORKERBEATS-SUBTREE "/" storm-id))
+
+(defn workerbeat-path
+  [storm-id node port]
+  (str (workerbeat-storm-root storm-id) "/" node "-" port))
+
+(defn backpressure-storm-root
+  [storm-id]
+  (str BACKPRESSURE-SUBTREE "/" storm-id))
+
+(defn backpressure-path
+  [storm-id node port]
+  (str (backpressure-storm-root storm-id) "/" node "-" port))
+
+(defn error-storm-root
+  [storm-id]
+  (str ERRORS-SUBTREE "/" storm-id))
+
+(defn error-path
+  [storm-id component-id]
+  (str (error-storm-root storm-id) "/" (url-encode component-id)))
+
+(def last-error-path-seg "last-error")
+
+(defn last-error-path
+  [storm-id component-id]
+  (str (error-storm-root storm-id)
+       "/"
+       (url-encode component-id)
+       "-"
+       last-error-path-seg))
+
+(defn credentials-path
+  [storm-id]
+  (str CREDENTIALS-SUBTREE "/" storm-id))
+
+(defn log-config-path
+  [storm-id]
+  (str LOGCONFIG-SUBTREE "/" storm-id))
+
+(defn profiler-config-path
+  ([storm-id]
+   (str PROFILERCONFIG-SUBTREE "/" storm-id))
+  ([storm-id host port request-type]
+   (str (profiler-config-path storm-id) "/" host "_" port "_" request-type)))
+
+(defn- issue-callback!
+  [cb-atom]
+  (let [cb @cb-atom]
+    (reset! cb-atom nil)
+    (when cb
+      (cb))))
+
+(defn- issue-map-callback!
+  [cb-atom id]
+  (let [cb (@cb-atom id)]
+    (swap! cb-atom dissoc id)
+    (when cb
+      (cb id))))
+
+(defn- maybe-deserialize
+  [ser clazz]
+  (when ser
+    (Utils/deserialize ser clazz)))
+
+(defrecord TaskError [error time-secs host port])
+
+(defn- parse-error-path
+  [^String p]
+  (Long/parseLong (.substring p 1)))
+
+(defn convert-executor-beats
+  "Ensures that we only return heartbeats for executors assigned to
+  this worker."
+  [executors worker-hb]
+  (let [executor-stats (:executor-stats worker-hb)]
+    (->> executors
+         (map (fn [t]
+                (if (contains? executor-stats t)
+                  {t {:time-secs (:time-secs worker-hb)
+                      :uptime (:uptime worker-hb)
+                      :stats (get executor-stats t)}})))
+         (into {}))))
+
+;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called.
+(defnk mk-storm-cluster-state
+  [cluster-state-spec :acls nil :context (ClusterStateContext.)]
+  (let [[solo? cluster-state] (if (instance? ClusterState cluster-state-spec)
+                                [false cluster-state-spec]
+                                [true (mk-distributed-cluster-state cluster-state-spec :auth-conf cluster-state-spec :acls acls :context context)])
+        assignment-info-callback (atom {})
+        assignment-info-with-version-callback (atom {})
+        assignment-version-callback (atom {})
+        supervisors-callback (atom nil)
+        backpressure-callback (atom {})   ;; we want to reigister a topo directory getChildren callback for all workers of this dir
+        assignments-callback (atom nil)
+        storm-base-callback (atom {})
+        blobstore-callback (atom nil)
+        credentials-callback (atom {})
+        log-config-callback (atom {})
+        state-id (.register
+                  cluster-state
+                  (fn [type path]
+                    (let [[subtree & args] (tokenize-path path)]
+                      (condp = subtree
+                         ASSIGNMENTS-ROOT (if (empty? args)
+                                             (issue-callback! assignments-callback)
+                                             (do
+                                               (issue-map-callback! assignment-info-callback (first args))
+                                               (issue-map-callback! assignment-version-callback (first args))
+                                               (issue-map-callback! assignment-info-with-version-callback (first args))))
+                         SUPERVISORS-ROOT (issue-callback! supervisors-callback)
+                         BLOBSTORE-ROOT (issue-callback! blobstore-callback) ;; callback register for blobstore
+                         STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
+                         CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first args))
+                         LOGCONFIG-ROOT (issue-map-callback! log-config-callback (first args))
+                         BACKPRESSURE-ROOT (issue-map-callback! backpressure-callback (first args))
+                         ;; this should never happen
+                         (exit-process! 30 "Unknown callback for subtree " subtree args)))))]
+    (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE
+               LOGCONFIG-SUBTREE]]
+      (.mkdirs cluster-state p acls))
+    (reify
+      StormClusterState
+
+      (assignments
+        [this callback]
+        (when callback
+          (reset! assignments-callback callback))
+        (.get_children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
+
+      (assignment-info
+        [this storm-id callback]
+        (when callback
+          (swap! assignment-info-callback assoc storm-id callback))
+        (clojurify-assignment (maybe-deserialize (.get_data cluster-state (assignment-path storm-id) (not-nil? callback)) Assignment)))
+
+      (assignment-info-with-version 
+        [this storm-id callback]
+        (when callback
+          (swap! assignment-info-with-version-callback assoc storm-id callback))
+        (let [{data :data version :version} 
+              (.get_data_with_version cluster-state (assignment-path storm-id) (not-nil? callback))]
+        {:data (clojurify-assignment (maybe-deserialize data Assignment))
+         :version version}))
+
+      (assignment-version 
+        [this storm-id callback]
+        (when callback
+          (swap! assignment-version-callback assoc storm-id callback))
+        (.get_version cluster-state (assignment-path storm-id) (not-nil? callback)))
+
+      ;; blobstore state
+      (blobstore
+        [this callback]
+        (when callback
+          (reset! blobstore-callback callback))
+        (.sync_path cluster-state BLOBSTORE-SUBTREE)
+        (.get_children cluster-state BLOBSTORE-SUBTREE (not-nil? callback)))
+
+      (nimbuses
+        [this]
+        (map #(maybe-deserialize (.get_data cluster-state (nimbus-path %1) false) NimbusSummary)
+          (.get_children cluster-state NIMBUSES-SUBTREE false)))
+
+      (add-nimbus-host!
+        [this nimbus-id nimbus-summary]
+        ;explicit delete for ephmeral node to ensure this session creates the entry.
+        (.delete_node cluster-state (nimbus-path nimbus-id))
+
+        (.add_listener cluster-state (reify ClusterStateListener
+                        (^void stateChanged[this ^ConnectionState newState]
+                          (log-message "Connection state listener invoked, zookeeper connection state has changed to " newState)
+                          (if (.equals newState ConnectionState/RECONNECTED)
+                            (do
+                              (log-message "Connection state has changed to reconnected so setting nimbuses entry one more time")
+                              (.set_ephemeral_node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls))))))
+        
+        (.set_ephemeral_node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls))
+
+      (setup-blobstore!
+        [this key nimbusInfo versionInfo]
+        (let [path (str (blobstore-path key) "/" (.toHostPortString nimbusInfo) "-" versionInfo)]
+          (log-message "setup-path" path)
+          (.mkdirs cluster-state (blobstore-path key) acls)
+          ;we delete the node first to ensure the node gets created as part of this session only.
+          (.delete_node_blobstore cluster-state (str (blobstore-path key)) (.toHostPortString nimbusInfo))
+          (.set_ephemeral_node cluster-state path nil acls)))
+
+      (blobstore-info
+        [this blob-key]
+        (let [path (blobstore-path blob-key)]
+          (.sync_path cluster-state path)
+          (.get_children cluster-state path false)))
+
+      (active-storms
+        [this]
+        (.get_children cluster-state STORMS-SUBTREE false))
+
+      (active-keys
+        [this]
+        (.get_children cluster-state BLOBSTORE-SUBTREE false))
+
+      (heartbeat-storms
+        [this]
+        (.get_worker_hb_children cluster-state WORKERBEATS-SUBTREE false))
+
+      (error-topologies
+        [this]
+        (.get_children cluster-state ERRORS-SUBTREE false))
+
+      (get-worker-heartbeat
+        [this storm-id node port]
+        (let [worker-hb (.get_worker_hb cluster-state (workerbeat-path storm-id node port) false)]
+          (if worker-hb
+            (-> worker-hb
+              (maybe-deserialize ClusterWorkerHeartbeat)
+              clojurify-zk-worker-hb))))
+
+      (executor-beats
+        [this storm-id executor->node+port]
+        ;; need to take executor->node+port in explicitly so that we don't run into a situation where a
+        ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
+        ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
+        ;; we avoid situations like that
+        (let [node+port->executors (reverse-map executor->node+port)
+              all-heartbeats (for [[[node port] executors] node+port->executors]
+                               (->> (get-worker-heartbeat this storm-id node port)
+                                    (convert-executor-beats executors)
+                                    ))]
+          (apply merge all-heartbeats)))
+
+      (supervisors
+        [this callback]
+        (when callback
+          (reset! supervisors-callback callback))
+        (.get_children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))
+
+      (supervisor-info
+        [this supervisor-id]
+        (clojurify-supervisor-info (maybe-deserialize (.get_data cluster-state (supervisor-path supervisor-id) false) SupervisorInfo)))
+
+      (topology-log-config
+        [this storm-id cb]
+        (when cb
+          (swap! log-config-callback assoc storm-id cb))
+        (maybe-deserialize (.get_data cluster-state (log-config-path storm-id) (not-nil? cb)) LogConfig))
+
+      (set-topology-log-config!
+        [this storm-id log-config]
+        (.set_data cluster-state (log-config-path storm-id) (Utils/serialize log-config) acls))
+
+      (set-worker-profile-request
+        [this storm-id profile-request]
+        (let [request-type (.get_action profile-request)
+              host (.get_node (.get_nodeInfo profile-request))
+              port (first (.get_port (.get_nodeInfo profile-request)))]
+          (.set_data cluster-state
+                     (profiler-config-path storm-id host port request-type)
+                     (Utils/serialize profile-request)
+                     acls)))
+
+      (get-topology-profile-requests
+        [this storm-id thrift?]
+        (let [path (profiler-config-path storm-id)
+              requests (if (.node_exists cluster-state path false)
+                         (dofor [c (.get_children cluster-state path false)]
+                                (let [raw (.get_data cluster-state (str path "/" c) false)
+                                      request (maybe-deserialize raw ProfileRequest)]
+                                      (if thrift?
+                                        request
+                                        (clojurify-profile-request request)))))]
+          requests))
+
+      (delete-topology-profile-requests
+        [this storm-id profile-request]
+        (let [profile-request-inst (thriftify-profile-request profile-request)
+              action (:action profile-request)
+              host (:host profile-request)
+              port (:port profile-request)]
+          (.delete_node cluster-state
+           (profiler-config-path storm-id host port action))))
+          
+      (get-worker-profile-requests
+        [this storm-id node-info thrift?]
+        (let [host (:host node-info)
+              port (:port node-info)
+              profile-requests (get-topology-profile-requests this storm-id thrift?)]
+          (if thrift?
+            (filter #(and (= host (.get_node (.get_nodeInfo %))) (= port (first (.get_port (.get_nodeInfo  %)))))
+                    profile-requests)
+            (filter #(and (= host (:host %)) (= port (:port %)))
+                    profile-requests))))
+      
+      (worker-heartbeat!
+        [this storm-id node port info]
+        (let [thrift-worker-hb (thriftify-zk-worker-hb info)]
+          (if thrift-worker-hb
+            (.set_worker_hb cluster-state (workerbeat-path storm-id node port) (Utils/serialize thrift-worker-hb) acls))))
+
+      (remove-worker-heartbeat!
+        [this storm-id node port]
+        (.delete_worker_hb cluster-state (workerbeat-path storm-id node port)))
+
+      (setup-heartbeats!
+        [this storm-id]
+        (.mkdirs cluster-state (workerbeat-storm-root storm-id) acls))
+
+      (teardown-heartbeats!
+        [this storm-id]
+        (try-cause
+          (.delete_worker_hb cluster-state (workerbeat-storm-root storm-id))
+          (catch KeeperException e
+            (log-warn-error e "Could not teardown heartbeats for " storm-id))))
+
+      (worker-backpressure!
+        [this storm-id node port on?]
+        "if znode exists and to be not on?, delete; if exists and on?, do nothing;
+        if not exists and to be on?, create; if not exists and not on?, do nothing"
+        (let [path (backpressure-path storm-id node port)
+              existed (.node_exists cluster-state path false)]
+          (if existed
+            (if (not on?)
+              (.delete_node cluster-state path))   ;; delete the znode since the worker is not congested
+            (if on?
+              (.set_ephemeral_node cluster-state path nil acls))))) ;; create the znode since worker is congested
+    
+      (topology-backpressure
+        [this storm-id callback]
+        "if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise not."
+        (when callback
+          (swap! backpressure-callback assoc storm-id callback))
+        (let [path (backpressure-storm-root storm-id)
+              children (.get_children cluster-state path (not-nil? callback))]
+              (> (count children) 0)))
+      
+      (setup-backpressure!
+        [this storm-id]
+        (.mkdirs cluster-state (backpressure-storm-root storm-id) acls))
+
+      (remove-worker-backpressure!
+        [this storm-id node port]
+        (.delete_node cluster-state (backpressure-path storm-id node port)))
+
+      (teardown-topology-errors!
+        [this storm-id]
+        (try-cause
+          (.delete_node cluster-state (error-storm-root storm-id))
+          (catch KeeperException e
+            (log-warn-error e "Could not teardown errors for " storm-id))))
+
+      (supervisor-heartbeat!
+        [this supervisor-id info]
+        (let [thrift-supervisor-info (thriftify-supervisor-info info)]
+          (.set_ephemeral_node cluster-state (supervisor-path supervisor-id) (Utils/serialize thrift-supervisor-info) acls)))
+
+      (activate-storm!
+        [this storm-id storm-base]
+        (let [thrift-storm-base (thriftify-storm-base storm-base)]
+          (.set_data cluster-state (storm-path storm-id) (Utils/serialize thrift-storm-base) acls)))
+
+      (update-storm!
+        [this storm-id new-elems]
+        (let [base (storm-base this storm-id nil)
+              executors (:component->executors base)
+              component->debug (:component->debug base)
+              new-elems (update new-elems :component->executors (partial merge executors))
+              new-elems (update new-elems :component->debug (partial merge-with merge component->debug))]
+          (.set_data cluster-state (storm-path storm-id)
+                    (-> base
+                        (merge new-elems)
+                        thriftify-storm-base
+                        Utils/serialize)
+                    acls)))
+
+      (storm-base
+        [this storm-id callback]
+        (when callback
+          (swap! storm-base-callback assoc storm-id callback))
+        (clojurify-storm-base (maybe-deserialize (.get_data cluster-state (storm-path storm-id) (not-nil? callback)) StormBase)))
+
+      (remove-storm-base!
+        [this storm-id]
+        (.delete_node cluster-state (storm-path storm-id)))
+
+      (set-assignment!
+        [this storm-id info]
+        (let [thrift-assignment (thriftify-assignment info)]
+          (.set_data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls)))
+
+      (remove-blobstore-key!
+        [this blob-key]
+        (log-debug "removing key" blob-key)
+        (.delete_node cluster-state (blobstore-path blob-key)))
+
+      (remove-key-version!
+        [this blob-key]
+        (.delete_node cluster-state (blobstore-max-key-sequence-number-path blob-key)))
+
+      (remove-storm!
+        [this storm-id]
+        (.delete_node cluster-state (assignment-path storm-id))
+        (.delete_node cluster-state (credentials-path storm-id))
+        (.delete_node cluster-state (log-config-path storm-id))
+        (.delete_node cluster-state (profiler-config-path storm-id))
+        (remove-storm-base! this storm-id))
+
+      (set-credentials!
+         [this storm-id creds topo-conf]
+         (let [topo-acls (mk-topo-only-acls topo-conf)
+               path (credentials-path storm-id)
+               thriftified-creds (thriftify-credentials creds)]
+           (.set_data cluster-state path (Utils/serialize thriftified-creds) topo-acls)))
+
+      (credentials
+        [this storm-id callback]
+        (when callback
+          (swap! credentials-callback assoc storm-id callback))
+        (clojurify-crdentials (maybe-deserialize (.get_data cluster-state (credentials-path storm-id) (not-nil? callback)) Credentials)))
+
+      (report-error
+         [this storm-id component-id node port error]
+         (let [path (error-path storm-id component-id)
+               last-error-path (last-error-path storm-id component-id)
+               data (thriftify-error {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port})
+               _ (.mkdirs cluster-state path acls)
+               ser-data (Utils/serialize data)
+               _ (.mkdirs cluster-state path acls)
+               _ (.create_sequential cluster-state (str path "/e") ser-data acls)
+               _ (.set_data cluster-state last-error-path ser-data acls)
+               to-kill (->> (.get_children cluster-state path false)
+                            (sort-by parse-error-path)
+                            reverse
+                            (drop 10))]
+           (doseq [k to-kill]
+             (.delete_node cluster-state (str path "/" k)))))
+
+      (errors
+         [this storm-id component-id]
+         (let [path (error-path storm-id component-id)
+               errors (if (.node_exists cluster-state path false)
+                        (dofor [c (.get_children cluster-state path false)]
+                          (if-let [data (-> (.get_data cluster-state
+                                                      (str path "/" c)
+                                                      false)
+                                          (maybe-deserialize ErrorInfo)
+                                          clojurify-error)]
+                            (map->TaskError data)))
+                        ())]
+           (->> (filter not-nil? errors)
+                (sort-by (comp - :time-secs)))))
+
+      (last-error
+        [this storm-id component-id]
+        (let [path (last-error-path storm-id component-id)]
+          (if (.node_exists cluster-state path false)
+            (if-let [data (-> (.get_data cluster-state path false)
+                              (maybe-deserialize ErrorInfo)
+                              clojurify-error)]
+              (map->TaskError data)))))
+      
+      (disconnect
+         [this]
+        (.unregister cluster-state state-id)
+        (when solo?
+          (.close cluster-state))))))
+
+;; daemons have a single thread that will respond to events
+;; start with initialize event
+;; callbacks add events to the thread's queue
+
+;; keeps in memory cache of the state, only for what client subscribes to. Any subscription is automatically kept in sync, and when there are changes, client is notified.
+;; master gives orders through state, and client records status in state (ephemerally)
+
+;; master tells nodes what workers to launch
+
+;; master writes this. supervisors and workers subscribe to this to understand complete topology. each storm is a map from nodes to workers to tasks to ports whenever topology changes everyone will be notified
+;; master includes timestamp of each assignment so that appropriate time can be given to each worker to start up
+;; /assignments/{storm id}
+
+;; which tasks they talk to, etc. (immutable until shutdown)
+;; everyone reads this in full to understand structure
+;; /tasks/{storm id}/{task id} ; just contains bolt id
+
+;; supervisors send heartbeats here, master doesn't subscribe but checks asynchronously
+;; /supervisors/status/{ephemeral node ids}  ;; node metadata such as port ranges are kept here
+
+;; tasks send heartbeats here, master doesn't subscribe, just checks asynchronously
+;; /taskbeats/{storm id}/{ephemeral task id}
+
+;; contains data about whether it's started or not, tasks and workers subscribe to specific storm here to know when to shutdown
+;; master manipulates
+;; /storms/{storm id}
+
+;; Zookeeper flows:
+
+;; Master:
+;; job submit:
+;; 1. read which nodes are available
+;; 2. set up the worker/{storm}/{task} stuff (static)
+;; 3. set assignments
+;; 4. start storm - necessary in case master goes down, when goes back up can remember to take down the storm (2 states: on or off)
+
+;; Monitoring (or by checking when nodes go down or heartbeats aren't received):
+;; 1. read assignment
+;; 2. see which tasks/nodes are up
+;; 3. make new assignment to fix any problems
+;; 4. if a storm exists but is not taken down fully, ensure that storm takedown is launched (step by step remove tasks and finally remove assignments)
+
+;; masters only possible watches is on ephemeral nodes and tasks, and maybe not even
+
+;; Supervisor:
+;; 1. monitor /storms/* and assignments
+;; 2. local state about which workers are local
+;; 3. when storm is on, check that workers are running locally & start/kill if different than assignments
+;; 4. when storm is off, monitor tasks for workers - when they all die or don't hearbeat, kill the process and cleanup
+
+;; Worker:
+;; 1. On startup, start the tasks if the storm is on
+
+;; Task:
+;; 1. monitor assignments, reroute when assignments change
+;; 2. monitor storm (when storm turns off, error if assignments change) - take down tasks as master turns them off
+
+;; locally on supervisor: workers write pids locally on startup, supervisor deletes it on shutdown (associates pid with worker name)
+;; supervisor periodically checks to make sure processes are alive
+;; {rootdir}/workers/{storm id}/{worker id}   ;; contains pid inside
+
+;; all tasks in a worker share the same cluster state
+;; workers, supervisors, and tasks subscribes to storm to know when it's started or stopped
+;; on stopped, master removes records in order (tasks need to subscribe to themselves to see if they disappear)
+;; when a master removes a worker, the supervisor should kill it (and escalate to kill -9)
+;; on shutdown, tasks subscribe to tasks that send data to them to wait for them to die. when node disappears, they can die

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
new file mode 100644
index 0000000..3104c52
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
@@ -0,0 +1,161 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.cluster-state.zookeeper-state-factory
+  (:import [org.apache.curator.framework.state ConnectionStateListener])
+  (:import [org.apache.zookeeper KeeperException$NoNodeException]
+           [org.apache.storm.cluster ClusterState DaemonType])
+  (:use [org.apache.storm cluster config log util])
+  (:require [org.apache.storm [zookeeper :as zk]])
+  (:gen-class
+   :implements [org.apache.storm.cluster.ClusterStateFactory]))
+
+(defn -mkState [this conf auth-conf acls context]
+  (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf auth-conf)]
+    (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls)
+    (.close zk))
+  (let [callbacks (atom {})
+        active (atom true)
+        zk-writer (zk/mk-client conf
+                         (conf STORM-ZOOKEEPER-SERVERS)
+                         (conf STORM-ZOOKEEPER-PORT)
+                         :auth-conf auth-conf
+                         :root (conf STORM-ZOOKEEPER-ROOT)
+                         :watcher (fn [state type path]
+                                    (when @active
+                                      (when-not (= :connected state)
+                                        (log-warn "Received event " state ":" type ":" path " with disconnected Writer Zookeeper."))
+                                      (when-not (= :none type)
+                                        (doseq [callback (vals @callbacks)]
+                                          (callback type path))))))
+        is-nimbus? (= (.getDaemonType context) DaemonType/NIMBUS)
+        zk-reader (if is-nimbus?
+                    (zk/mk-client conf
+                         (conf STORM-ZOOKEEPER-SERVERS)
+                         (conf STORM-ZOOKEEPER-PORT)
+                         :auth-conf auth-conf
+                         :root (conf STORM-ZOOKEEPER-ROOT)
+                         :watcher (fn [state type path]
+                                    (when @active
+                                      (when-not (= :connected state)
+                                        (log-warn "Received event " state ":" type ":" path " with disconnected Reader Zookeeper."))
+                                      (when-not (= :none type)
+                                        (doseq [callback (vals @callbacks)]
+                                          (callback type path))))))
+                    zk-writer)]
+    (reify
+     ClusterState
+
+     (register
+       [this callback]
+       (let [id (uuid)]
+         (swap! callbacks assoc id callback)
+         id))
+
+     (unregister
+       [this id]
+       (swap! callbacks dissoc id))
+
+     (set-ephemeral-node
+       [this path data acls]
+       (zk/mkdirs zk-writer (parent-path path) acls)
+       (if (zk/exists zk-writer path false)
+         (try-cause
+           (zk/set-data zk-writer path data) ; should verify that it's ephemeral
+           (catch KeeperException$NoNodeException e
+             (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
+             (zk/create-node zk-writer path data :ephemeral acls)))
+         (zk/create-node zk-writer path data :ephemeral acls)))
+
+     (create-sequential
+       [this path data acls]
+       (zk/create-node zk-writer path data :sequential acls))
+
+     (set-data
+       [this path data acls]
+       ;; note: this does not turn off any existing watches
+       (if (zk/exists zk-writer path false)
+         (zk/set-data zk-writer path data)
+         (do
+           (zk/mkdirs zk-writer (parent-path path) acls)
+           (zk/create-node zk-writer path data :persistent acls))))
+
+     (set-worker-hb
+       [this path data acls]
+       (.set_data this path data acls))
+
+     (delete-node
+       [this path]
+       (zk/delete-node zk-writer path))
+
+     (delete-worker-hb
+       [this path]
+       (.delete_node this path))
+
+     (get-data
+       [this path watch?]
+       (zk/get-data zk-reader path watch?))
+
+     (get-data-with-version
+       [this path watch?]
+       (zk/get-data-with-version zk-reader path watch?))
+
+     (get-version
+       [this path watch?]
+       (zk/get-version zk-reader path watch?))
+
+     (get-worker-hb
+       [this path watch?]
+       (.get_data this path watch?))
+
+     (get-children
+       [this path watch?]
+       (zk/get-children zk-reader path watch?))
+
+     (get-worker-hb-children
+       [this path watch?]
+       (.get_children this path watch?))
+
+     (mkdirs
+       [this path acls]
+       (zk/mkdirs zk-writer path acls))
+
+     (node-exists
+       [this path watch?]
+       (zk/exists-node? zk-reader path watch?))
+
+     (add-listener
+       [this listener]
+       (let [curator-listener (reify ConnectionStateListener
+                                (stateChanged
+                                  [this client newState]
+                                  (.stateChanged listener client newState)))]
+         (zk/add-listener zk-reader curator-listener)))
+
+     (sync-path
+       [this path]
+       (zk/sync-path zk-writer path))
+
+      (delete-node-blobstore
+        [this path nimbus-host-port-info]
+        (zk/delete-node-blobstore zk-writer path nimbus-host-port-info))
+
+     (close
+       [this]
+       (reset! active false)
+       (.close zk-writer)
+       (if is-nimbus?
+         (.close zk-reader))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/activate.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/activate.clj b/storm-core/src/clj/org/apache/storm/command/activate.clj
new file mode 100644
index 0000000..dc452e8
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/command/activate.clj
@@ -0,0 +1,24 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.command.activate
+  (:use [org.apache.storm thrift log])
+  (:gen-class))
+
+(defn -main [name] 
+  (with-configured-nimbus-connection nimbus
+    (.activate nimbus name)
+    (log-message "Activated topology: " name)
+    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/blobstore.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/blobstore.clj b/storm-core/src/clj/org/apache/storm/command/blobstore.clj
new file mode 100644
index 0000000..b1496db
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/command/blobstore.clj
@@ -0,0 +1,162 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.command.blobstore
+  (:import [java.io InputStream OutputStream]
+           [org.apache.storm.generated SettableBlobMeta AccessControl AuthorizationException
+            KeyNotFoundException]
+           [org.apache.storm.blobstore BlobStoreAclHandler])
+  (:use [org.apache.storm config]
+        [clojure.string :only [split]]
+        [clojure.tools.cli :only [cli]]
+        [clojure.java.io :only [copy input-stream output-stream]]
+        [org.apache.storm blobstore log util])
+  (:gen-class))
+
+(defn update-blob-from-stream
+  "Update a blob in the blob store from an InputStream"
+  [key ^InputStream in]
+  (with-configured-blob-client blobstore
+    (let [out (.updateBlob blobstore key)]
+      (try 
+        (copy in out)
+        (.close out)
+        (catch Exception e
+          (log-message e)
+          (.cancel out)
+          (throw e))))))
+
+(defn create-blob-from-stream
+  "Create a blob in the blob store from an InputStream"
+  [key ^InputStream in ^SettableBlobMeta meta]
+  (with-configured-blob-client blobstore
+    (let [out (.createBlob blobstore key meta)]
+      (try 
+        (copy in out)
+        (.close out)
+        (catch Exception e
+          (.cancel out)
+          (throw e))))))
+
+(defn read-blob
+  "Read a blob in the blob store and write to an OutputStream"
+  [key ^OutputStream out]
+  (with-configured-blob-client blobstore
+    (with-open [in (.getBlob blobstore key)]
+      (copy in out))))
+
+(defn as-access-control
+  "Convert a parameter to an AccessControl object"
+  [param]
+  (BlobStoreAclHandler/parseAccessControl (str param)))
+
+(defn as-acl
+  [param]
+  (map as-access-control (split param #",")))
+
+(defn access-control-str
+  [^AccessControl acl]
+  (BlobStoreAclHandler/accessControlToString acl))
+
+(defn read-cli [args]
+  (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
+    (if file
+      (with-open [f (output-stream file)]
+        (read-blob key f))
+      (read-blob key System/out))))
+
+(defn update-cli [args]
+  (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
+    (if file
+      (with-open [f (input-stream file)]
+        (update-blob-from-stream key f))
+      (update-blob-from-stream key System/in))
+    (log-message "Successfully updated " key)))
+
+(defn create-cli [args]
+  (let [[{file :file acl :acl replication-factor :replication-factor} [key] _] (cli args ["-f" "--file" :default nil]
+                                                  ["-a" "--acl" :default [] :parse-fn as-acl]
+                                                  ["-r" "--replication-factor" :default -1 :parse-fn parse-int])
+        meta (doto (SettableBlobMeta. acl)
+                   (.set_replication_factor replication-factor))]
+    (validate-key-name! key)
+    (log-message "Creating " key " with ACL " (pr-str (map access-control-str acl)))
+    (if file
+      (with-open [f (input-stream file)]
+        (create-blob-from-stream key f meta))
+      (create-blob-from-stream key System/in meta))
+    (log-message "Successfully created " key)))
+
+(defn delete-cli [args]
+  (with-configured-blob-client blobstore
+    (doseq [key args]
+      (.deleteBlob blobstore key)
+      (log-message "deleted " key))))
+
+(defn list-cli [args]
+  (with-configured-blob-client blobstore
+    (let [keys (if (empty? args) (iterator-seq (.listKeys blobstore)) args)]
+      (doseq [key keys]
+        (try
+          (let [meta (.getBlobMeta blobstore key)
+                version (.get_version meta)
+                acl (.get_acl (.get_settable meta))]
+            (log-message key " " version " " (pr-str (map access-control-str acl))))
+          (catch AuthorizationException ae
+            (if-not (empty? args) (log-error "ACCESS DENIED to key: " key)))
+          (catch KeyNotFoundException knf
+            (if-not (empty? args) (log-error key " NOT FOUND"))))))))
+
+(defn set-acl-cli [args]
+  (let [[{set-acl :set} [key] _]
+           (cli args ["-s" "--set" :default [] :parse-fn as-acl])]
+    (with-configured-blob-client blobstore
+      (let [meta (.getBlobMeta blobstore key)
+            acl (.get_acl (.get_settable meta))
+            new-acl (if set-acl set-acl acl)
+            new-meta (SettableBlobMeta. new-acl)]
+        (log-message "Setting ACL for " key " to " (pr-str (map access-control-str new-acl)))
+        (.setBlobMeta blobstore key new-meta)))))
+
+(defn rep-cli [args]
+  (let [sub-command (first args)
+        new-args (rest args)]
+    (with-configured-blob-client blobstore
+      (condp = sub-command
+      "--read" (let [key (first new-args)
+                     blob-replication (.getBlobReplication blobstore key)]
+                 (log-message "Current replication factor " blob-replication)
+                 blob-replication)
+      "--update" (let [[{replication-factor :replication-factor} [key] _]
+                        (cli new-args ["-r" "--replication-factor" :parse-fn parse-int])]
+                   (if (nil? replication-factor)
+                     (throw (RuntimeException. (str "Please set the replication factor")))
+                     (let [blob-replication (.updateBlobReplication blobstore key replication-factor)]
+                       (log-message "Replication factor is set to " blob-replication)
+                       blob-replication)))
+      :else (throw (RuntimeException. (str sub-command " is not a supported blobstore command")))))))
+
+(defn -main [& args]
+  (let [command (first args)
+        new-args (rest args)]
+    (condp = command
+      "cat" (read-cli new-args)
+      "create" (create-cli new-args)
+      "update" (update-cli new-args)
+      "delete" (delete-cli new-args)
+      "list" (list-cli new-args)
+      "set-acl" (set-acl-cli new-args)
+      "replication" (rep-cli new-args)
+      :else (throw (RuntimeException. (str command " is not a supported blobstore command"))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/config_value.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/config_value.clj b/storm-core/src/clj/org/apache/storm/command/config_value.clj
new file mode 100644
index 0000000..9bc3e92
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/command/config_value.clj
@@ -0,0 +1,24 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.command.config-value
+  (:use [org.apache.storm config log])
+  (:gen-class))
+
+
+(defn -main [^String name]
+  (let [conf (read-storm-config)]
+    (println "VALUE:" (conf name))
+    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/deactivate.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/deactivate.clj b/storm-core/src/clj/org/apache/storm/command/deactivate.clj
new file mode 100644
index 0000000..4fd2c85
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/command/deactivate.clj
@@ -0,0 +1,24 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.command.deactivate
+  (:use [org.apache.storm thrift log])
+  (:gen-class))
+
+(defn -main [name] 
+  (with-configured-nimbus-connection nimbus
+    (.deactivate nimbus name)
+    (log-message "Deactivated topology: " name)
+    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
new file mode 100644
index 0000000..96de02d
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
@@ -0,0 +1,26 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.command.dev-zookeeper
+  (:use [org.apache.storm zookeeper util config])
+  (:gen-class))
+
+(defn -main [& args]
+  (let [conf (read-storm-config)
+        port (conf STORM-ZOOKEEPER-PORT)
+        localpath (conf DEV-ZOOKEEPER-PATH)]
+    (rmr localpath)
+    (mk-inprocess-zookeeper localpath :port port)
+    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/get_errors.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/get_errors.clj b/storm-core/src/clj/org/apache/storm/command/get_errors.clj
new file mode 100644
index 0000000..c267390
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/command/get_errors.clj
@@ -0,0 +1,52 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.command.get-errors
+  (:use [clojure.tools.cli :only [cli]])
+  (:use [org.apache.storm thrift log])
+  (:use [org.apache.storm util])
+  (:require [org.apache.storm.daemon
+             [nimbus :as nimbus]
+             [common :as common]])
+  (:import [org.apache.storm.generated GetInfoOptions NumErrorsChoice
+            TopologySummary ErrorInfo])
+  (:gen-class))
+
+(defn get-topology-id [name topologies]
+  (let [topology (first (filter #(= (.get_name %1) name) topologies))]
+    (when (not-nil? topology) (.get_id topology))))
+
+(defn get-component-errors
+  [topology-errors]
+  (apply hash-map (remove nil?
+                    (flatten (for [[comp-name comp-errors] topology-errors]
+                               (let [latest-error (when (not (empty? comp-errors)) (first comp-errors))]
+                                 (if latest-error [comp-name (.get_error ^ErrorInfo latest-error)])))))))
+
+(defn -main [name]
+  (with-configured-nimbus-connection nimbus
+    (let [opts (doto (GetInfoOptions.)
+                 (.set_num_err_choice NumErrorsChoice/ONE))
+          cluster-info (.getClusterInfo nimbus)
+          topologies (.get_topologies cluster-info)
+          topo-id (get-topology-id name topologies)
+          topo-info (when (not-nil? topo-id) (.getTopologyInfoWithOpts nimbus topo-id opts))]
+      (if (or (nil? topo-id) (nil? topo-info))
+        (println (to-json {"Failure" (str "No topologies running with name " name)}))
+        (let [topology-name (.get_name topo-info)
+              topology-errors (.get_errors topo-info)]
+          (println (to-json (hash-map
+                              "Topology Name" topology-name
+                              "Comp-Errors" (get-component-errors topology-errors)))))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/healthcheck.clj b/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
new file mode 100644
index 0000000..d96d7b3
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
@@ -0,0 +1,88 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.command.healthcheck
+  (:require [org.apache.storm
+             [config :refer :all]
+             [log :refer :all]]
+            [clojure.java [io :as io]]
+            [clojure [string :refer [split]]])
+  (:gen-class))
+
+(defn interrupter
+  "Interrupt a given thread after ms milliseconds."
+  [thread ms]
+  (let [interrupter (Thread.
+                     (fn []
+                       (try
+                         (Thread/sleep ms)
+                         (.interrupt thread)
+                         (catch InterruptedException e))))]
+    (.start interrupter)
+    interrupter))
+
+(defn check-output [lines]
+  (if (some #(.startsWith % "ERROR") lines)
+    :failed
+    :success))
+
+(defn process-script [conf script]
+  (let [script-proc (. (Runtime/getRuntime) (exec script))
+        curthread (Thread/currentThread)
+        interrupter-thread (interrupter curthread
+                                        (conf STORM-HEALTH-CHECK-TIMEOUT-MS))]
+    (try
+      (.waitFor script-proc)
+      (.interrupt interrupter-thread)
+      (if (not (= (.exitValue script-proc) 0))
+        :failed_with_exit_code
+        (check-output (split
+                       (slurp (.getInputStream script-proc))
+                       #"\n+")))
+      (catch InterruptedException e
+        (println "Script" script "timed out.")
+        :timeout)
+      (catch Exception e
+        (println "Script failed with exception: " e)
+        :failed_with_exception)
+      (finally (.interrupt interrupter-thread)))))
+
+(defn health-check [conf]
+  (let [health-dir (absolute-healthcheck-dir conf)
+        health-files (file-seq (io/file health-dir))
+        health-scripts (filter #(and (.canExecute %)
+                                     (not (.isDirectory %)))
+                               health-files)
+        results (->> health-scripts
+                     (map #(.getAbsolutePath %))
+                     (map (partial process-script conf)))]
+    (log-message
+     (pr-str (map #'vector
+                  (map #(.getAbsolutePath %) health-scripts)
+                  results)))
+    ; failed_with_exit_code is OK. We're mimicing Hadoop's health checks.
+    ; We treat non-zero exit codes as indicators that the scripts failed
+    ; to execute properly, not that the system is unhealthy, in which case
+    ; we don't want to start killing things.
+    (if (every? #(or (= % :failed_with_exit_code)
+                     (= % :success))
+                results)
+      0
+      1)))
+
+(defn -main [& args]
+  (let [conf (read-storm-config)]
+    (System/exit
+     (health-check conf))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/heartbeats.clj b/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
new file mode 100644
index 0000000..ff28cba
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
@@ -0,0 +1,52 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.command.heartbeats
+  (:require [org.apache.storm
+             [config :refer :all]
+             [log :refer :all]
+             [cluster :refer :all]
+             [converter :refer :all]]
+        [clojure.string :refer :all])
+  (:import [org.apache.storm.generated ClusterWorkerHeartbeat]
+           [org.apache.storm.utils Utils])
+  (:gen-class))
+
+(defn -main [command path & args]
+  (let [conf (read-storm-config)
+        cluster (mk-distributed-cluster-state conf :auth-conf conf)]
+    (println "Command: [" command "]")
+    (condp = command
+      "list"
+      (let [message (join " \n" (.get_worker_hb_children cluster path false))]
+        (log-message "list " path ":\n"
+                     message "\n"))
+      "get"
+      (log-message 
+       (if-let [hb (.get_worker_hb cluster path false)]
+         (clojurify-zk-worker-hb
+          (Utils/deserialize
+           hb
+           ClusterWorkerHeartbeat))
+         "Nothing"))
+      
+      (log-message "Usage: heartbeats [list|get] path"))
+    
+    (try
+      (.close cluster)
+      (catch Exception e
+        (log-message "Caught exception: " e " on close."))))
+  (System/exit 0))
+         

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/kill_topology.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/kill_topology.clj b/storm-core/src/clj/org/apache/storm/command/kill_topology.clj
new file mode 100644
index 0000000..84e0a64
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/command/kill_topology.clj
@@ -0,0 +1,29 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.command.kill-topology
+  (:use [clojure.tools.cli :only [cli]])
+  (:use [org.apache.storm thrift config log])
+  (:import [org.apache.storm.generated KillOptions])
+  (:gen-class))
+
+(defn -main [& args]
+  (let [[{wait :wait} [name] _] (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)])
+        opts (KillOptions.)]
+    (if wait (.set_wait_secs opts wait))
+    (with-configured-nimbus-connection nimbus
+      (.killTopologyWithOpts nimbus name opts)
+      (log-message "Killed topology: " name)
+      )))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
new file mode 100644
index 0000000..2670735
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
@@ -0,0 +1,33 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.command.kill-workers
+  (:import [java.io File])
+  (:use [org.apache.storm.daemon common])
+  (:use [org.apache.storm util config])
+  (:require [org.apache.storm.daemon
+             [supervisor :as supervisor]])
+  (:gen-class))
+
+(defn -main 
+  "Construct the supervisor-data from scratch and kill the workers on this supervisor"
+  [& args]
+  (let [conf (read-storm-config)
+        conf (assoc conf STORM-LOCAL-DIR (. (File. (conf STORM-LOCAL-DIR)) getCanonicalPath))
+        isupervisor (supervisor/standalone-supervisor)
+        supervisor-data (supervisor/supervisor-data conf nil isupervisor)
+        ids (supervisor/my-worker-ids conf)]
+    (doseq [id ids]
+      (supervisor/shutdown-worker supervisor-data id))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/list.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/list.clj b/storm-core/src/clj/org/apache/storm/command/list.clj
new file mode 100644
index 0000000..87975cd
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/command/list.clj
@@ -0,0 +1,38 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.command.list
+  (:use [org.apache.storm thrift log])
+  (:import [org.apache.storm.generated TopologySummary])
+  (:gen-class))
+
+(defn -main []
+  (with-configured-nimbus-connection nimbus
+    (let [cluster-info (.getClusterInfo nimbus)
+          topologies (.get_topologies cluster-info)
+          msg-format "%-20s %-10s %-10s %-12s %-10s"]
+      (if (or (nil? topologies) (empty? topologies))
+        (println "No topologies running.")
+        (do
+          (println (format msg-format "Topology_name" "Status" "Num_tasks" "Num_workers" "Uptime_secs"))
+          (println "-------------------------------------------------------------------")
+          (doseq [^TopologySummary topology topologies]
+            (let [topology-name (.get_name topology)
+                  topology-status (.get_status topology)
+                  topology-num-tasks (.get_num_tasks topology)
+                  topology-num-workers (.get_num_workers topology)
+                  topology-uptime-secs (.get_uptime_secs topology)]
+              (println (format msg-format  topology-name topology-status topology-num-tasks
+                               topology-num-workers topology-uptime-secs)))))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/monitor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/monitor.clj b/storm-core/src/clj/org/apache/storm/command/monitor.clj
new file mode 100644
index 0000000..7fa9b2a
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/command/monitor.clj
@@ -0,0 +1,37 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.command.monitor
+  (:use [clojure.tools.cli :only [cli]])
+  (:use [org.apache.storm.thrift :only [with-configured-nimbus-connection]])
+  (:import [org.apache.storm.utils Monitor])
+  (:gen-class)
+ )
+
+(defn -main [& args]
+  (let [[{interval :interval component :component stream :stream watch :watch} [name] _]
+        (cli args ["-i" "--interval" :default 4 :parse-fn #(Integer/parseInt %)]
+          ["-m" "--component" :default nil]
+          ["-s" "--stream" :default "default"]
+          ["-w" "--watch" :default "emitted"])
+        mon (Monitor.)]
+    (if interval (.set_interval mon interval))
+    (if name (.set_topology mon name))
+    (if component (.set_component mon component))
+    (if stream (.set_stream mon stream))
+    (if watch (.set_watch mon watch))
+    (with-configured-nimbus-connection nimbus
+      (.metrics mon nimbus)
+      )))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/rebalance.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/rebalance.clj b/storm-core/src/clj/org/apache/storm/command/rebalance.clj
new file mode 100644
index 0000000..3868091
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/command/rebalance.clj
@@ -0,0 +1,46 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.command.rebalance
+  (:use [clojure.tools.cli :only [cli]])
+  (:use [org.apache.storm thrift config log])
+  (:import [org.apache.storm.generated RebalanceOptions])
+  (:gen-class))
+
+(defn- parse-executor [^String s]
+  (let [eq-pos (.lastIndexOf s "=")
+        name (.substring s 0 eq-pos)
+        amt (.substring s (inc eq-pos))]
+    {name (Integer/parseInt amt)}
+    ))
+
+(defn -main [& args] 
+  (let [[{wait :wait executor :executor num-workers :num-workers} [name] _]
+                  (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)]
+                            ["-n" "--num-workers" :default nil :parse-fn #(Integer/parseInt %)]
+                            ["-e" "--executor"  :parse-fn parse-executor
+                             :assoc-fn (fn [previous key val]
+                                         (assoc previous key
+                                                (if-let [oldval (get previous key)]
+                                                  (merge oldval val)
+                                                  val)))])
+        opts (RebalanceOptions.)]
+    (if wait (.set_wait_secs opts wait))
+    (if executor (.set_num_executors opts executor))
+    (if num-workers (.set_num_workers opts num-workers))
+    (with-configured-nimbus-connection nimbus
+      (.rebalance nimbus name opts)
+      (log-message "Topology " name " is rebalancing")
+      )))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/set_log_level.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/set_log_level.clj b/storm-core/src/clj/org/apache/storm/command/set_log_level.clj
new file mode 100644
index 0000000..7e1c3c5
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/command/set_log_level.clj
@@ -0,0 +1,75 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.command.set-log-level
+  (:use [clojure.tools.cli :only [cli]])
+  (:use [org.apache.storm thrift log])
+  (:import [org.apache.logging.log4j Level])
+  (:import [org.apache.storm.generated LogConfig LogLevel LogLevelAction])
+  (:gen-class))
+
+(defn- get-storm-id
+  "Get topology id for a running topology from the topology name."
+  [nimbus name]
+  (let [info (.getClusterInfo nimbus)
+        topologies (.get_topologies info)
+        topology (first (filter (fn [topo] (= name (.get_name topo))) topologies))]
+    (if topology 
+      (.get_id topology)
+      (throw (.IllegalArgumentException (str name " is not a running topology"))))))
+
+(defn- parse-named-log-levels [action]
+  "Parses [logger name]=[level string]:[optional timeout],[logger name2]...
+
+   e.g. ROOT=DEBUG:30
+        root logger, debug for 30 seconds
+
+        org.apache.foo=WARN
+        org.apache.foo set to WARN indefinitely"
+  (fn [^String s]
+    (let [log-args (re-find #"(.*)=([A-Z]+):?(\d*)" s)
+          name (if (= action LogLevelAction/REMOVE) s (nth log-args 1))
+          level (Level/toLevel (nth log-args 2))
+          timeout-str (nth log-args 3)
+          log-level (LogLevel.)]
+      (if (= action LogLevelAction/REMOVE)
+        (.set_action log-level action)
+        (do
+          (.set_action log-level action)
+          (.set_target_log_level log-level (.toString level))
+          (.set_reset_log_level_timeout_secs log-level
+            (Integer. (if (= timeout-str "") "0" timeout-str)))))
+      {name log-level})))
+
+(defn- merge-together [previous key val]
+   (assoc previous key
+      (if-let [oldval (get previous key)]
+         (merge oldval val)
+         val)))
+
+(defn -main [& args]
+  (let [[{log-setting :log-setting remove-log-setting :remove-log-setting} [name] _]
+        (cli args ["-l" "--log-setting"
+                   :parse-fn (parse-named-log-levels LogLevelAction/UPDATE)
+                   :assoc-fn merge-together]
+                  ["-r" "--remove-log-setting"
+                   :parse-fn (parse-named-log-levels LogLevelAction/REMOVE)
+                   :assoc-fn merge-together])
+        log-config (LogConfig.)]
+    (doseq [[log-name log-val] (merge log-setting remove-log-setting)]
+      (.put_to_named_logger_level log-config log-name log-val))
+    (log-message "Sent log config " log-config " for topology " name)
+    (with-configured-nimbus-connection nimbus
+      (.setLogConfig nimbus (get-storm-id nimbus name) log-config))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
new file mode 100644
index 0000000..b09c4f7
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -0,0 +1,33 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.command.shell-submission
+  (:import [org.apache.storm StormSubmitter])
+  (:use [org.apache.storm thrift util config log zookeeper])
+  (:require [clojure.string :as str])
+  (:gen-class))
+
+
+(defn -main [^String tmpjarpath & args]
+  (let [conf (read-storm-config)
+        zk-leader-elector (zk-leader-elector conf)
+        leader-nimbus (.getLeader zk-leader-elector)
+        host (.getHost leader-nimbus)
+        port (.getPort leader-nimbus)
+        no-op (.close zk-leader-elector)
+        jarpath (StormSubmitter/submitJar conf tmpjarpath)
+        args (concat args [host port jarpath])]
+    (exec-command! (str/join " " args))
+    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/upload_credentials.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/upload_credentials.clj b/storm-core/src/clj/org/apache/storm/command/upload_credentials.clj
new file mode 100644
index 0000000..f63bde4
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/command/upload_credentials.clj
@@ -0,0 +1,35 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.command.upload-credentials
+  (:use [clojure.tools.cli :only [cli]])
+  (:use [org.apache.storm log util])
+  (:import [org.apache.storm StormSubmitter])
+  (:import [java.util Properties])
+  (:import [java.io FileReader])
+  (:gen-class))
+
+(defn read-map [file-name]
+  (let [props (Properties. )
+        _ (.load props (FileReader. file-name))]
+    (clojurify-structure props)))
+
+(defn -main [& args]
+  (let [[{cred-file :file} [name & rawCreds]] (cli args ["-f" "--file" :default nil])
+        _ (when (and rawCreds (not (even? (.size rawCreds)))) (throw (RuntimeException.  "Need an even number of arguments to make a map")))
+        mapping (if rawCreds (apply assoc {} rawCreds) {})
+        file-mapping (if (nil? cred-file) {} (read-map cred-file))]
+      (StormSubmitter/pushCredentials name {} (merge file-mapping mapping))
+      (log-message "Uploaded new creds to topology: " name)))