You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/08/24 15:52:20 UTC

[35/50] [abbrv] storm git commit: Merge remote-tracking branch 'upstream/master' into STORM-166

Merge remote-tracking branch 'upstream/master' into STORM-166

Conflicts:
	conf/defaults.yaml
	storm-core/src/clj/backtype/storm/cluster.clj
	storm-core/src/clj/backtype/storm/daemon/nimbus.clj
	storm-core/src/clj/backtype/storm/ui/core.clj
	storm-core/test/clj/backtype/storm/cluster_test.clj
	storm-core/test/clj/backtype/storm/security/auth/auth_test.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/57587182
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/57587182
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/57587182

Branch: refs/heads/master
Commit: 575871822b116608f47a0a7a4cd3b4b17df9a672
Parents: f7205d2 bb8d48d
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Mar 19 15:04:43 2015 -0700
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Thu Mar 19 15:24:50 2015 -0700

----------------------------------------------------------------------
 CHANGELOG.md                                    |   21 +
 DEVELOPER.md                                    |    7 +
 README.markdown                                 |    2 +
 SECURITY.md                                     |   59 +-
 STORM-UI-REST-API.md                            |   12 +
 bin/storm                                       |  584 +--------
 bin/storm.py                                    |  543 ++++++++
 conf/defaults.yaml                              |    3 +
 conf/storm-env.sh                               |   24 +
 dev-tools/test-ns.py                            |   17 +
 docs/documentation/Clojure-DSL.md               |    4 +-
 docs/documentation/Command-line-client.md       |    2 +-
 docs/documentation/Common-patterns.md           |    6 +-
 docs/documentation/Concepts.md                  |   48 +-
 docs/documentation/Configuration.md             |    4 +-
 docs/documentation/Distributed-RPC.md           |    2 +-
 .../Guaranteeing-message-processing.md          |    6 +-
 docs/documentation/Hooks.md                     |    6 +-
 docs/documentation/Local-mode.md                |    4 +-
 docs/documentation/Powered-By.md                |  106 +-
 ...unning-topologies-on-a-production-cluster.md |    6 +-
 .../Serialization-(prior-to-0.6.0).md           |    4 +-
 docs/documentation/Serialization.md             |    2 +-
 docs/documentation/Structure-of-the-codebase.md |    8 +-
 docs/documentation/Transactional-topologies.md  |   18 +-
 docs/documentation/Tutorial.md                  |    8 +-
 ...nding-the-parallelism-of-a-Storm-topology.md |   16 +-
 external/README.md                              |   18 +
 external/storm-jdbc/README.md                   |   84 +-
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  |   36 +-
 .../org/apache/storm/jdbc/common/Column.java    |    3 +-
 .../apache/storm/jdbc/common/JdbcClient.java    |   52 +-
 .../jdbc/mapper/SimpleJdbcLookupMapper.java     |    2 +-
 .../storm/jdbc/trident/state/JdbcState.java     |   18 +-
 .../storm/jdbc/common/JdbcClientTest.java       |   39 +-
 .../jdbc/topology/UserPersistanceTopology.java  |    2 +-
 .../ExponentialBackoffMsgRetryManager.java      |    2 +-
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |    7 +-
 .../test/storm/kafka/bolt/KafkaBoltTest.java    |    2 +-
 .../storm/redis/trident/state/RedisState.java   |    2 +-
 storm-core/pom.xml                              |    3 +-
 storm-core/src/clj/backtype/storm/cluster.clj   |   60 +-
 storm-core/src/clj/backtype/storm/config.clj    |    7 +-
 storm-core/src/clj/backtype/storm/converter.clj |  201 +++
 .../src/clj/backtype/storm/daemon/common.clj    |   10 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |    4 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |    5 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  150 ++-
 .../clj/backtype/storm/daemon/supervisor.clj    |   16 +-
 .../src/clj/backtype/storm/daemon/task.clj      |   11 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |    3 +-
 storm-core/src/clj/backtype/storm/stats.clj     |   78 +-
 storm-core/src/clj/backtype/storm/thrift.clj    |   32 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |   42 +-
 storm-core/src/genthrift.sh                     |   13 +-
 storm-core/src/java_license_header.txt          |   17 +
 storm-core/src/jvm/backtype/storm/Config.java   |   40 +-
 .../jvm/backtype/storm/ConfigValidation.java    |    8 +-
 .../src/jvm/backtype/storm/StormSubmitter.java  |   88 +-
 .../storm/coordination/BatchBoltExecutor.java   |    4 +-
 .../backtype/storm/generated/Assignment.java    |  983 ++++++++++++++
 .../storm/generated/ClusterWorkerHeartbeat.java |  673 ++++++++++
 .../backtype/storm/generated/ExecutorStats.java |  105 +-
 .../jvm/backtype/storm/generated/NodeInfo.java  |  556 ++++++++
 .../jvm/backtype/storm/generated/StormBase.java | 1211 ++++++++++++++++++
 .../storm/generated/SupervisorInfo.java         | 1182 +++++++++++++++++
 .../storm/generated/TopologyActionOptions.java  |  387 ++++++
 .../storm/generated/TopologyStatus.java         |   68 +
 .../backtype/storm/messaging/netty/Client.java  |   10 +-
 .../backtype/storm/messaging/netty/Context.java |   33 +-
 .../auth/DefaultHttpCredentialsPlugin.java      |   19 +-
 .../storm/security/auth/ITransportPlugin.java   |    4 +-
 .../storm/security/auth/ReqContext.java         |   28 +-
 .../security/auth/SaslTransportPlugin.java      |    3 +-
 .../security/auth/SimpleTransportPlugin.java    |    5 +-
 .../storm/security/auth/TBackoffConnect.java    |    4 +-
 .../storm/security/auth/ThriftClient.java       |   12 +-
 .../authorizer/ImpersonationAuthorizer.java     |  154 +++
 .../auth/authorizer/SimpleACLAuthorizer.java    |   55 +-
 .../auth/digest/DigestSaslTransportPlugin.java  |    6 +-
 .../auth/digest/ServerCallbackHandler.java      |   21 +-
 .../kerberos/KerberosSaslTransportPlugin.java   |    9 +-
 .../auth/kerberos/ServerCallbackHandler.java    |   38 +-
 .../DefaultSerializationDelegate.java           |   11 +-
 .../GzipBridgeSerializationDelegate.java        |    7 +-
 .../GzipSerializationDelegate.java              |   10 +-
 .../serialization/SerializationDelegate.java    |    2 +-
 .../ThriftSerializationDelegate.java            |   52 +
 .../backtype/storm/topology/BoltDeclarer.java   |    4 +
 .../backtype/storm/topology/InputDeclarer.java  |  128 ++
 .../storm/topology/TopologyBuilder.java         |    6 +-
 .../jvm/backtype/storm/utils/DRPCClient.java    |    2 +-
 .../jvm/backtype/storm/utils/LocalState.java    |    4 +-
 .../jvm/backtype/storm/utils/NimbusClient.java  |   16 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |   82 +-
 .../backtype/storm/utils/VersionedStore.java    |    9 +-
 storm-core/src/jvm/storm/trident/Stream.java    |    2 +-
 .../src/jvm/storm/trident/TridentTopology.java  |    6 +-
 storm-core/src/py/storm/Nimbus-remote           |    0
 storm-core/src/py/storm/ttypes.py               |  827 +++++++++++-
 storm-core/src/py_license_header.txt            |   18 +
 storm-core/src/storm.thrift                     |   51 +
 .../templates/component-page-template.html      |    2 +-
 .../templates/topology-page-template.html       |    4 +-
 .../test/clj/backtype/storm/cluster_test.clj    |   35 +-
 .../test/clj/backtype/storm/nimbus_test.clj     |    6 +-
 .../auth/DefaultHttpCredentialsPlugin_test.clj  |   16 +-
 .../backtype/storm/security/auth/auth_test.clj  |  146 ++-
 .../GzipBridgeSerializationDelegateTest.java    |    6 +-
 .../ThriftBridgeSerializationDelegateTest.java  |   60 +
 110 files changed, 8564 insertions(+), 1095 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/STORM-UI-REST-API.md
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/conf/defaults.yaml
----------------------------------------------------------------------
diff --cc conf/defaults.yaml
index 305b31c,78e89bb..49584f2
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@@ -47,11 -47,10 +47,12 @@@ storm.auth.simple-white-list.users: [
  storm.auth.simple-acl.users: []
  storm.auth.simple-acl.users.commands: []
  storm.auth.simple-acl.admins: []
 +storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate"
 +storm.codedistributor.class: "backtype.storm.codedistributor.LocalFileSystemCodeDistributor"
+ storm.meta.serialization.delegate: "backtype.storm.serialization.ThriftSerializationDelegate"
  
  ### nimbus.* configs are for the master
 -nimbus.host: "localhost"
 +nimbus.seeds : ["localhost:6627"]
  nimbus.thrift.port: 6627
  nimbus.thrift.threads: 64
  nimbus.thrift.max_buffer_size: 1048576

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/cluster.clj
index c48e3c1,7987a30..333feec
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@@ -15,13 -15,14 +15,15 @@@
  ;; limitations under the License.
  
  (ns backtype.storm.cluster
-   (:import [org.apache.zookeeper.data Stat ACL Id])
+   (:import [org.apache.zookeeper.data Stat ACL Id]
 -           [backtype.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials]
++           [backtype.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary]
+            [java.io Serializable])
    (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
    (:import [backtype.storm.utils Utils])
    (:import [java.security MessageDigest])
    (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider])
 +  (:import [backtype.storm.nimbus NimbusInfo])
-   (:use [backtype.storm util log config])
+   (:use [backtype.storm util log config converter])
    (:require [backtype.storm [zookeeper :as zk]])
    (:require [backtype.storm.daemon [common :as common]]))
  
@@@ -348,25 -322,6 +350,25 @@@
            (swap! assignment-version-callback assoc storm-id callback))
          (get-version cluster-state (assignment-path storm-id) (not-nil? callback)))
  
 +      (code-distributor
 +        [this callback]
 +        (when callback
 +          (reset! code-distributor-callback callback))
 +        (get-children cluster-state CODE-DISTRIBUTOR-SUBTREE (not-nil? callback)))
 +
 +      (nimbuses
 +        [this]
-         (map #(maybe-deserialize (get-data cluster-state (nimbus-path %1) false))
++        (map #(maybe-deserialize (get-data cluster-state (nimbus-path %1) false) NimbusSummary)
 +          (get-children cluster-state NIMBUSES-SUBTREE false)))
 +
 +      (add-nimbus-host!
 +        [this nimbus-id nimbus-summary]
 +        (set-ephemeral-node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls))
 +
 +      (code-distributor-info
 +        [this storm-id]
 +        (map (fn [nimbus-info] (NimbusInfo/parse nimbus-info)) (get-children cluster-state (code-distributor-path storm-id) false)))
 +
        (active-storms
          [this]
          (get-children cluster-state STORMS-SUBTREE false))
@@@ -465,13 -428,9 +475,14 @@@
  
        (set-assignment!
          [this storm-id info]
-         (set-data cluster-state (assignment-path storm-id) (Utils/serialize info) acls))
+         (let [thrift-assignment (thriftify-assignment info)]
+           (set-data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls)))
  
 +      (setup-code-distributor!
 +        [this storm-id nimbusInfo]
 +        (mkdirs cluster-state (code-distributor-path storm-id) acls)
 +        (mkdirs cluster-state (str (code-distributor-path storm-id) "/" (.toHostPortString nimbusInfo)) acls))
 +
        (remove-storm!
          [this storm-id]
          (delete-node cluster-state (assignment-path storm-id))

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index eee0417,d1a1a59..8a2c0fb
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@@ -91,9 -77,9 +91,10 @@@
  (defn nimbus-data [conf inimbus]
    (let [forced-scheduler (.getForcedScheduler inimbus)]
      {:conf conf
 +     :nimbus-host-port-info (NimbusInfo/fromConf conf)
       :inimbus inimbus
       :authorization-handler (mk-authorization-handler (conf NIMBUS-AUTHORIZER) conf)
+      :impersonation-authorization-handler (mk-authorization-handler (conf NIMBUS-IMPERSONATION-AUTHORIZER) conf)
       :submitted-count (atom 0)
       :storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when
                                                                         (Utils/isZkAuthenticationConfiguredStormServer
@@@ -221,11 -195,11 +212,12 @@@
    ([nimbus storm-id event]
       (transition! nimbus storm-id event false))
    ([nimbus storm-id event error-on-no-transition?]
 -     (locking (:submit-lock nimbus)
 +    (is-leader nimbus)
 +    (locking (:submit-lock nimbus)
         (let [system-events #{:startup}
               [event & event-args] (if (keyword? event) [event] event)
-              status (topology-status nimbus storm-id)]
+              storm-base (-> nimbus :storm-cluster-state  (.storm-base storm-id nil))
+              status (:status storm-base)]
           ;; handles the case where event was scheduled but topology has been removed
           (if-not status
             (log-message "Cannot apply event " event " to " storm-id " because topology no longer exists")
@@@ -347,32 -323,9 +341,32 @@@
     (FileUtils/cleanDirectory (File. stormroot))
     (setup-jar conf tmp-jar-location stormroot)
     (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology))
-    (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf))
+    (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/javaSerialize storm-conf))
 +   (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) stormroot storm-id))
     ))
  
 +(defn- wait-for-desired-code-replication [nimbus conf storm-id]
 +  (let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
 +        max-replication-wait-time (conf TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC)
 +        total-wait-time (atom 0)
 +        current-replication-count (atom (if (:code-distributor nimbus) (.getReplicationCount (:code-distributor nimbus) storm-id) 0))]
 +  (if (:code-distributor nimbus)
 +    (while (and (> min-replication-count @current-replication-count)
 +             (or (= -1 max-replication-wait-time)
 +               (< @total-wait-time max-replication-wait-time)))
 +        (sleep-secs 1)
 +        (log-debug "waiting for desired replication to be achieved.
 +          min-replication-count = " min-replication-count  " max-replication-wait-time = " max-replication-wait-time
 +          "current-replication-count = " @current-replication-count " total-wait-time " @total-wait-time)
 +        (swap! total-wait-time inc)
 +        (reset! current-replication-count  (.getReplicationCount (:code-distributor nimbus) storm-id))))
 +  (if (< min-replication-count @current-replication-count)
 +    (log-message "desired replication count "  min-replication-count " achieved,
 +      current-replication-count" @current-replication-count)
 +    (log-message "desired replication count of "  min-replication-count " not achieved but we have hit the max wait time
 +      so moving on with replication count = " @current-replication-count)
 +    )))
 +
  (defn- read-storm-topology [conf storm-id]
    (let [stormroot (master-stormdist-root conf storm-id)]
      (Utils/deserialize
@@@ -708,10 -661,9 +702,10 @@@
  ;; only keep existing slots that satisfy one of those slots. for rest, reassign them across remaining slots
  ;; edge case for slots with no executor timeout but with supervisor timeout... just treat these as valid slots that can be reassigned to. worst comes to worse the executor will timeout and won't assign here next time around
  (defnk mk-assignments [nimbus :scratch-topology-id nil]
 -  (let [conf (:conf nimbus)
 +  (if (is-leader nimbus :throw-exception false)
 +    (let [conf (:conf nimbus)
          storm-cluster-state (:storm-cluster-state nimbus)
-         ^INimbus inimbus (:inimbus nimbus) 
+         ^INimbus inimbus (:inimbus nimbus)
          ;; read all the topologies
          topology-ids (.active-storms storm-cluster-state)
          topologies (into {} (for [tid topology-ids]
@@@ -775,14 -727,14 +769,14 @@@
      (->> new-assignments
            (map (fn [[topology-id assignment]]
              (let [existing-assignment (get existing-assignments topology-id)]
-               [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))] 
+               [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))]
                )))
            (into {})
 -          (.assignSlots inimbus topologies))
 -    ))
 +          (.assignSlots inimbus topologies)))
 +    (log-message "not a leader, skipping assignments")))
  
  (defn- start-storm [nimbus storm-name storm-id topology-initial-status]
-   {:pre [(#{:active :inactive} topology-initial-status)]}                
+   {:pre [(#{:active :inactive} topology-initial-status)]}
    (let [storm-cluster-state (:storm-cluster-state nimbus)
          conf (:conf nimbus)
          storm-conf (read-storm-conf conf storm-id)
@@@ -1049,24 -1009,9 +1058,23 @@@
    (let [nimbus (nimbus-data conf inimbus)
         principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)]
      (.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf)
 +
 +    ;add to nimbuses
-     (.add-nimbus-host! (:storm-cluster-state nimbus)
-       (.toHostPortString (:nimbus-host-port-info nimbus))
-       {
-         :host (.getHost (:nimbus-host-port-info nimbus))
-         :port (.getPort (:nimbus-host-port-info nimbus))
-         :start-time-secs (current-time-secs)
-         :version (str (VersionInfo/getVersion))
-         })
++    (.add-nimbus-host! (:storm-cluster-state nimbus) (.toHostPortString (:nimbus-host-port-info nimbus))
++      (NimbusSummary.
++        (.getHost (:nimbus-host-port-info nimbus))
++        (.getPort (:nimbus-host-port-info nimbus))
++        (current-time-secs)
++        false ;is-leader
++        (str (VersionInfo/getVersion))))
 +
 +    (.addToLeaderLockQueue (:leader-elector nimbus))
      (cleanup-corrupt-topologies! nimbus)
 -    (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
 -      (transition! nimbus storm-id :startup))
 +    ;register call back for code-distributor
 +    (.code-distributor (:storm-cluster-state nimbus) (fn [] (sync-code conf nimbus)))
 +    (when (is-leader nimbus :throw-exception false)
 +      (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
 +        (transition! nimbus storm-id :startup)))
      (schedule-recurring (:timer nimbus)
                          0
                          (conf NIMBUS-MONITOR-FREQ-SECS)
@@@ -1317,14 -1253,8 +1325,16 @@@
                                                                  (count (:used-ports info))
                                                                  id )
                                              ))
 -              nimbus-uptime ((:uptime nimbus))
                bases (topology-bases storm-cluster-state)
 +              nimbuses (.nimbuses storm-cluster-state)
-               nimbuses (map #(NimbusSummary. (:host %1) (:port %1) (time-delta (:start-time-secs %1))
-                                (let [leader (.getLeader (:leader-elector nimbus))]
-                                  (and (= (.getHost leader) (:host %1)) (= (.getPort leader) (:port %1))))
-                                (:version %1))
-                          nimbuses
-                          )
++
++              ;;update the isLeader field for each nimbus summary
++              _ (let [leader (.getLeader (:leader-elector nimbus))
++                      leader-host (.getHost leader)
++                      leader-port (.getPort leader)]
++                  (doseq [nimbus-summary nimbuses]
++                    (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
++
                topology-summaries (dofor [[id base] bases :when base]
  	                                  (let [assignment (.assignment-info storm-cluster-state id nil)
                                                  topo-summ (TopologySummary. id

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index d315925,bc8b999..4fc219e
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@@ -376,7 -377,7 +382,7 @@@
              master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data))
              stormroot (supervisor-stormdist-root conf storm-id)]
          (if-not (or (contains? downloaded-storm-ids storm-id) (.exists (File. stormroot)) (nil? master-code-dir))
--          (download-storm-code conf storm-id master-code-dir download-lock))
++          (download-storm-code conf storm-id master-code-dir supervisor download-lock))
          ))
  
      (wait-for-workers-launch
@@@ -725,7 -728,7 +731,7 @@@
         first ))
  
  (defmethod download-storm-code
--    :local [conf storm-id master-code-dir download-lock]
++    :local [conf storm-id master-code-dir supervisor download-lock]
      (let [stormroot (supervisor-stormdist-root conf storm-id)]
        (locking download-lock
              (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/clj/backtype/storm/thrift.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/thrift.clj
index 474ea67,6445f46..8f4c659
--- a/storm-core/src/clj/backtype/storm/thrift.clj
+++ b/storm-core/src/clj/backtype/storm/thrift.clj
@@@ -20,9 -22,9 +22,10 @@@
              StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface
              ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo
              GlobalStreamId ComponentObject ComponentObject$_Fields
-             ShellComponent])
+             ShellComponent SupervisorInfo])
    (:import [backtype.storm.utils Utils NimbusClient])
    (:import [backtype.storm Constants])
++  (:import [backtype.storm.security.auth ReqContext])
    (:import [backtype.storm.grouping CustomStreamGrouping])
    (:import [backtype.storm.topology TopologyBuilder])
    (:import [backtype.storm.clojure RichShellBolt RichShellSpout])
@@@ -84,13 -88,17 +89,15 @@@
  (defmacro with-configured-nimbus-connection
    [client-sym & body]
    `(let [conf# (read-storm-config)
-          nimbusClient# (NimbusClient/getConfiguredClient conf#)
 -         host# (conf# NIMBUS-HOST)
 -         port# (conf# NIMBUS-THRIFT-PORT)]
 -    (with-nimbus-connection [~client-sym host# port#]
 -      ~@body)))
++         context# (ReqContext/context)
++         user# (if (.principal context#) (.getName (.principal context#)))
++         nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#)
 +         ~client-sym (.getClient nimbusClient#)
 +         conn# (.transport nimbusClient#)
 +         ]
 +     (try
 +       ~@body
 +     (finally (.close conn#)))))
  
  (defn direct-output-fields
    [fields]

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/ui/core.clj
index 0f93f23,c64f35d..553434e
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@@ -931,18 -929,19 +947,19 @@@
    (GET "/api/v1/token" [ & m]
         (json-response (format "{\"antiForgeryToken\": \"%s\"}" *anti-forgery-token*) (:callback m) :serialize-fn identity))
    (POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id & m]
 +    (thrift/with-configured-nimbus-connection nimbus
+     (assert-authorized-user servlet-request "activate" (topology-config id))
 -    (with-nimbus nimbus
        (let [tplg (->> (doto
                          (GetInfoOptions.)
                          (.set_num_err_choice NumErrorsChoice/NONE))
                        (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
              name (.get_name tplg)]
-         (assert-authorized-user servlet-request "activate" (topology-config id))
          (.activate nimbus name)
          (log-message "Activating topology '" name "'")))
 -    (json-response (topology-op-response id "deactivate") (m "callback")))
 +    (json-response (topology-op-response id "activate") (m "callback")))
    (POST "/api/v1/topology/:id/deactivate" [:as {:keys [cookies servlet-request]} id & m]
 +    (thrift/with-configured-nimbus-connection nimbus
+     (assert-authorized-user servlet-request "deactivate" (topology-config id))
 -    (with-nimbus nimbus
        (let [tplg (->> (doto
                          (GetInfoOptions.)
                          (.set_num_err_choice NumErrorsChoice/NONE))
@@@ -953,7 -951,8 +969,8 @@@
          (log-message "Deactivating topology '" name "'")))
      (json-response (topology-op-response id "deactivate") (m "callback")))
    (POST "/api/v1/topology/:id/rebalance/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time & m]
 +    (thrift/with-configured-nimbus-connection nimbus
+     (assert-authorized-user servlet-request "rebalance" (topology-config id))
 -    (with-nimbus nimbus
        (let [tplg (->> (doto
                          (GetInfoOptions.)
                          (.set_num_err_choice NumErrorsChoice/NONE))
@@@ -972,7 -970,8 +988,8 @@@
          (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
      (json-response (topology-op-response id "rebalance") (m "callback")))
    (POST "/api/v1/topology/:id/kill/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time & m]
+     (assert-authorized-user servlet-request "killTopology" (topology-config id))
 -    (with-nimbus nimbus
 +    (thrift/with-configured-nimbus-connection nimbus
        (let [tplg (->> (doto
                          (GetInfoOptions.)
                          (.set_num_err_choice NumErrorsChoice/NONE))

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
index 39d3895,b171353..071d2b6
--- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
@@@ -40,31 -30,23 +40,36 @@@ public class NimbusClient extends Thrif
      private Nimbus.Client _client;
      private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class);
  
+ 
      public static NimbusClient getConfiguredClient(Map conf) {
 -        try {
 -            String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
 -            return new NimbusClient(conf, nimbusHost);
 -        } catch (TTransportException ex) {
 -            throw new RuntimeException(ex);
 -        }
++        return getConfiguredClientAs(conf, null);
+     }
+ 
+     public static NimbusClient getConfiguredClientAs(Map conf, String asUser) {
 -        try {
 -            String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
 -            return new NimbusClient(conf, nimbusHost, null, null, asUser);
 -        } catch (TTransportException ex) {
 -            throw new RuntimeException(ex);
 +        List<String> seeds = (List<String>) conf.get(Config.NIMBUS_SEEDS);
 +        for(String seed : seeds) {
 +            String[] split = seed.split(":");
 +            String host = split[0];
 +            int port = Integer.parseInt(split[1]);
 +            try {
 +                NimbusClient client = new NimbusClient(conf,host,port);
 +                ClusterSummary clusterInfo = client.getClient().getClusterInfo();
 +                List<NimbusSummary> nimbuses = clusterInfo.get_nimbuses();
 +                if(nimbuses != null) {
 +                    for(NimbusSummary nimbusSummary : nimbuses) {
 +                        if(nimbusSummary.is_isLeader()) {
-                             return new NimbusClient(conf, nimbusSummary.get_host(), nimbusSummary.get_port());
++                            return new NimbusClient(conf, nimbusSummary.get_host(), nimbusSummary.get_port(), null, asUser);
 +                        }
 +                    }
 +                }
 +                throw new RuntimeException("Found nimbuses " + nimbuses + " none of which is elected as leader, please try " +
 +                        "again after some time.");
 +            } catch (Exception e) {
 +                LOG.warn("Ignoring exception while trying to get leader nimbus info from " + seed, e);
 +            }
          }
 +        throw new RuntimeException("Could not find leader nimbus from seed hosts " + seeds +". " +
 +                "Did you specify a valid list of nimbus host:port for config " + Config.NIMBUS_SEEDS);
      }
  
      public NimbusClient(Map conf, String host, int port) throws TTransportException {

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/storm.thrift
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/ui/public/templates/topology-page-template.html
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/test/clj/backtype/storm/cluster_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/cluster_test.clj
index 85aaf3b,98eae68..ffc72af
--- a/storm-core/test/clj/backtype/storm/cluster_test.clj
+++ b/storm-core/test/clj/backtype/storm/cluster_test.clj
@@@ -169,12 -168,10 +169,14 @@@
  (deftest test-storm-cluster-state-basics
    (with-inprocess-zookeeper zk-port
      (let [state (mk-storm-state zk-port)
-           assignment1 (Assignment. "/aaa" {} {1 [2 2002 1]} {})
-           assignment2 (Assignment. "/aaa" {} {1 [2 2002]} {})
+           assignment1 (Assignment. "/aaa" {} {[1] ["1" 1001 1]} {})
+           assignment2 (Assignment. "/aaa" {} {[2] ["2" 2002]} {})
 +          nimbusInfo1 (NimbusInfo. "nimbus1" 6667 false)
 +          nimbusInfo2 (NimbusInfo. "nimbus2" 6667 false)
-           base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "")
-           base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "")]
++          nimbusSummary1 (NimbusSummary. "nimbus1" 6667 (current-time-secs) false "v1")
++          nimbusSummary2 (NimbusSummary. "nimbus2" 6667 (current-time-secs) false "v2")
+           base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "" nil nil)
+           base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "" nil nil)]
        (is (= [] (.assignments state nil)))
        (.set-assignment! state "storm1" assignment1)
        (is (= assignment1 (.assignment-info state "storm1" nil)))
@@@ -204,21 -201,6 +206,21 @@@
        (.set-credentials! state "storm1" {"b" "b"} {})
        (is (= {"b" "b"} (.credentials state "storm1" nil)))
  
 +      (is (= [] (.code-distributor state nil)))
 +      (.setup-code-distributor! state "storm1" nimbusInfo1)
 +      (is (= ["storm1"] (.code-distributor state nil)))
 +      (is (= [nimbusInfo1] (.code-distributor-info state "storm1")))
 +      (.setup-code-distributor! state "storm1" nimbusInfo2)
 +      (is (= #{nimbusInfo1 nimbusInfo2} (set (.code-distributor-info state "storm1"))))
 +      (.remove-storm! state "storm1")
 +      (is (= [] (.code-distributor state nil)))
 +
 +      (is (= [] (.nimbuses state)))
-       (.add-nimbus-host! state "host:port" nimbusInfo1)
-       (is (= [nimbusInfo1] (.nimbuses state)))
-       (.add-nimbus-host! state "host1:port" nimbusInfo2)
-       (is (= #{nimbusInfo1 nimbusInfo2} (set (.nimbuses state))))
++      (.add-nimbus-host! state "nimbus1:port" nimbusSummary1)
++      (is (= [nimbusSummary1] (.nimbuses state)))
++      (.add-nimbus-host! state "nimbus2:port" nimbusSummary2)
++      (is (= #{nimbusSummary1 nimbusSummary2} (set (.nimbuses state))))
 +
        ;; TODO add tests for task info and task heartbeat setting and getting
        (.disconnect state)
        )))

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/test/clj/backtype/storm/nimbus_test.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
index 416dddf,ddd5e03..cb4ccc8
--- a/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
@@@ -117,11 -119,12 +119,11 @@@
           (^TopologyInfo getTopologyInfo [this ^String storm-id]))))
    ([conf inimbus]
       (dummy-service-handler conf inimbus nil)))
-      
  
- (defn launch-server [server-port login-cfg aznClass transportPluginClass serverConf] 
+ 
+ (defn launch-server [server-port login-cfg aznClass transportPluginClass serverConf]
    (let [conf1 (merge (read-storm-config)
-                      {NIMBUS-AUTHORIZER aznClass 
+                      {NIMBUS-AUTHORIZER aznClass
 -                      NIMBUS-HOST "localhost"
                        NIMBUS-THRIFT-PORT server-port
                        STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass})
          conf2 (if login-cfg (merge conf1 {"java.security.auth.login.config" login-cfg}) conf1)