You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2015/04/01 19:50:16 UTC

[45/45] storm git commit: Merge remote-tracking branch 'apache/master' into nimbus-ha

Merge remote-tracking branch 'apache/master' into nimbus-ha

Conflicts:
	storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
	storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/765e4c2f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/765e4c2f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/765e4c2f

Branch: refs/heads/nimbus-ha-branch
Commit: 765e4c2fabafbfe31fb0f43e5b670fd3b91d1a2e
Parents: a11fcc3 2aaa718
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Apr 1 10:44:56 2015 -0700
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Apr 1 10:44:56 2015 -0700

----------------------------------------------------------------------
 CHANGELOG.md                                    |  13 ++
 README.markdown                                 |   2 +
 STORM-UI-REST-API.md                            |  30 +++
 docs/_posts/2015-03-25-storm094-released.md     |  24 ++
 docs/downloads.html                             |  64 ++++--
 examples/storm-starter/pom.xml                  |   2 +-
 external/storm-hbase/pom.xml                    |   2 +-
 .../org/apache/storm/hbase/bolt/HBaseBolt.java  |   2 +-
 .../storm/hbase/bolt/HBaseLookupBolt.java       |   4 +-
 .../hbase/trident/state/HBaseMapState.java      |   2 +
 .../storm/hbase/trident/state/HBaseState.java   |   2 +-
 external/storm-hdfs/pom.xml                     |   2 +-
 .../org/apache/storm/hdfs/bolt/HdfsBolt.java    |   2 +-
 .../storm/hdfs/bolt/SequenceFileBolt.java       |   2 +-
 external/storm-hive/pom.xml                     |   2 +-
 external/storm-jdbc/pom.xml                     |   2 +-
 external/storm-kafka/pom.xml                    |   2 +-
 .../src/jvm/storm/kafka/KafkaUtils.java         |  16 +-
 external/storm-redis/README.md                  | 108 ++++++++-
 external/storm-redis/pom.xml                    |   2 +-
 .../storm/redis/bolt/AbstractRedisBolt.java     |   8 +-
 .../storm/redis/bolt/RedisLookupBolt.java       | 112 ++++++++++
 .../apache/storm/redis/bolt/RedisStoreBolt.java | 100 +++++++++
 .../redis/common/config/JedisClusterConfig.java |  82 +++++++
 .../redis/common/config/JedisPoolConfig.java    |  97 +++++++++
 .../common/container/JedisClusterContainer.java |  47 ++++
 .../JedisCommandsContainerBuilder.java          |  38 ++++
 .../JedisCommandsInstanceContainer.java         |  25 +++
 .../redis/common/container/JedisContainer.java  |  65 ++++++
 .../common/mapper/RedisDataTypeDescription.java |  50 +++++
 .../redis/common/mapper/RedisLookupMapper.java  |  40 ++++
 .../storm/redis/common/mapper/RedisMapper.java  |  22 ++
 .../redis/common/mapper/RedisStoreMapper.java   |  21 ++
 .../storm/redis/common/mapper/TupleMapper.java  |  27 +++
 .../trident/mapper/TridentTupleMapper.java      |  27 ---
 .../trident/state/RedisClusterMapState.java     |   2 +-
 .../redis/trident/state/RedisClusterState.java  |   2 +-
 .../trident/state/RedisClusterStateQuerier.java |  10 +-
 .../trident/state/RedisClusterStateUpdater.java |  10 +-
 .../redis/trident/state/RedisMapState.java      |  21 +-
 .../storm/redis/trident/state/RedisState.java   |   2 +-
 .../redis/trident/state/RedisStateQuerier.java  |  10 +-
 .../state/RedisStateSetCountQuerier.java        |  74 -------
 .../trident/state/RedisStateSetUpdater.java     |  80 -------
 .../redis/trident/state/RedisStateUpdater.java  |  10 +-
 .../redis/util/config/JedisClusterConfig.java   |  82 -------
 .../redis/util/config/JedisPoolConfig.java      |  97 ---------
 .../util/container/JedisClusterContainer.java   |  47 ----
 .../JedisCommandsContainerBuilder.java          |  38 ----
 .../JedisCommandsInstanceContainer.java         |  25 ---
 .../redis/util/container/JedisContainer.java    |  65 ------
 .../storm/redis/topology/LookupWordCount.java   | 115 ++++++----
 .../redis/topology/PersistentWordCount.java     |  81 ++++---
 .../storm/redis/topology/WordCounter.java       |  19 +-
 .../redis/trident/WordCountTridentRedis.java    |   7 +-
 .../trident/WordCountTridentRedisCluster.java   |   6 +-
 .../WordCountTridentRedisClusterMap.java        |   8 +-
 .../redis/trident/WordCountTridentRedisMap.java |   9 +-
 .../redis/trident/WordCountTupleMapper.java     |  10 +-
 pom.xml                                         |   2 +-
 .../maven-shade-clojure-transformer/pom.xml     |   2 +-
 storm-buildtools/storm-maven-plugins/pom.xml    |   2 +-
 storm-core/pom.xml                              |   2 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  62 +++++-
 .../src/clj/backtype/storm/ui/helpers.clj       |  15 +-
 storm-core/src/jvm/backtype/storm/Config.java   |   6 +
 .../jvm/backtype/storm/utils/NimbusClient.java  |  29 ++-
 storm-core/src/ui/public/css/style.css          |  62 ++++++
 .../storm/security/auth/nimbus_auth_test.clj    | 217 ++++++++++---------
 storm-dist/binary/pom.xml                       |   2 +-
 storm-dist/source/pom.xml                       |   2 +-
 71 files changed, 1425 insertions(+), 854 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/765e4c2f/STORM-UI-REST-API.md
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/765e4c2f/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/765e4c2f/storm-core/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/765e4c2f/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/ui/core.clj
index 553434e,ab2b2d9..02c3d90
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@@ -16,10 -16,12 +16,12 @@@
  
  (ns backtype.storm.ui.core
    (:use compojure.core)
-   (:use ring.middleware.reload)
+   (:use [clojure.java.shell :only [sh]])
+   (:use ring.middleware.reload
+         ring.middleware.multipart-params)
    (:use [ring.middleware.json :only [wrap-json-params]])
    (:use [hiccup core page-helpers])
 -  (:use [backtype.storm config util log])
 +  (:use [backtype.storm config util log zookeeper])
    (:use [backtype.storm.ui helpers])
    (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
                                                ACKER-FAIL-STREAM-ID system-id? mk-authorization-handler]]])
@@@ -499,8 -505,41 +501,41 @@@
                (hashmap-to-persistent bolts))
         spout-comp-summs bolt-comp-summs window id))))
  
+ (defn validate-tplg-submit-params [params]
+   (let [tplg-jar-file (params :topologyJar)
+         tplg-config (if (not-nil? (params :topologyConfig)) (from-json (params :topologyConfig)))]
+     (cond
+      (nil? tplg-jar-file) {:valid false :error "missing topology jar file"}
+      (nil? tplg-config) {:valid false :error "missing topology config"}
+      (nil? (tplg-config "topologyMainClass")) {:valid false :error "topologyMainClass missing in topologyConfig"}
+      :else {:valid true})))
+ 
+ (defn run-tplg-submit-cmd [tplg-jar-file tplg-config user]
+   (let [tplg-main-class (if (not-nil? tplg-config) (trim (tplg-config "topologyMainClass")))
+         tplg-main-class-args (if (not-nil? tplg-config) (clojure.string/join " " (tplg-config "topologyMainClassArgs")))
+         tplg-jvm-opts (if (not-nil? tplg-config) (clojure.string/join " " (tplg-config "topologyJvmOpts")))
+         storm-home (System/getProperty "storm.home")
+         storm-conf-dir (str storm-home file-path-separator "conf")
+         storm-log-dir (if (not-nil? (*STORM-CONF* "storm.log.dir")) (*STORM-CONF* "storm.log.dir")
+                           (str storm-home file-path-separator "logs"))
+         storm-libs (str storm-home file-path-separator "lib" file-path-separator "*")
+         java-cmd (str (System/getProperty "java.home") file-path-separator "bin" file-path-separator "java")
+         storm-cmd (str storm-home file-path-separator "bin" file-path-separator "storm")
+         tplg-cmd-response (sh storm-cmd "jar" tplg-jar-file
+                               tplg-main-class
+                               tplg-main-class-args
+                               (if (not= user "unknown") (str "-c storm.doAsUser=" user) ""))]
+     (log-message "tplg-cmd-response " tplg-cmd-response)
+     (cond
+      (= (tplg-cmd-response :exit) 0) {"status" "success"}
+      (and (not= (tplg-cmd-response :exit) 0)
+           (not-nil? (re-find #"already exists on cluster" (tplg-cmd-response :err)))) {"status" "failed" "error" "Topology with the same name exists in cluster"}
+           (not= (tplg-cmd-response :exit) 0) {"status" "failed" "error" (clojure.string/trim-newline (tplg-cmd-response :err))}
+           :else {"status" "success" "response" "topology deployed"}
+           )))
+ 
  (defn cluster-configuration []
 -  (with-nimbus nimbus
 +  (thrift/with-configured-nimbus-connection nimbus
      (.getNimbusConf ^Nimbus$Client nimbus)))
  
  (defn cluster-summary

http://git-wip-us.apache.org/repos/asf/storm/blob/765e4c2f/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/765e4c2f/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
index 071d2b6,6314deb..08610e9
--- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
@@@ -46,34 -41,23 +46,43 @@@ public class NimbusClient extends Thrif
      }
  
      public static NimbusClient getConfiguredClientAs(Map conf, String asUser) {
 -        try {
 -            if(conf.containsKey(Config.STORM_DO_AS_USER)) {
 -                if(asUser != null && !asUser.isEmpty()) {
 -                    LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence."
 -                            , asUser, conf.get(Config.STORM_DO_AS_USER));
++        if (conf.containsKey(Config.STORM_DO_AS_USER)) {
++            if (asUser != null && !asUser.isEmpty()) {
++                LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence."
++                        , asUser, conf.get(Config.STORM_DO_AS_USER));
++            }
++            asUser = (String) conf.get(Config.STORM_DO_AS_USER);
++        }
++
 +        List<String> seeds = (List<String>) conf.get(Config.NIMBUS_SEEDS);
-         for(String seed : seeds) {
++        for (String seed : seeds) {
 +            String[] split = seed.split(":");
 +            String host = split[0];
 +            int port = Integer.parseInt(split[1]);
 +            try {
-                 NimbusClient client = new NimbusClient(conf,host,port);
++                NimbusClient client = new NimbusClient(conf, host, port);
 +                ClusterSummary clusterInfo = client.getClient().getClusterInfo();
 +                List<NimbusSummary> nimbuses = clusterInfo.get_nimbuses();
-                 if(nimbuses != null) {
-                     for(NimbusSummary nimbusSummary : nimbuses) {
-                         if(nimbusSummary.is_isLeader()) {
++                if (nimbuses != null) {
++                    for (NimbusSummary nimbusSummary : nimbuses) {
++                        if (nimbusSummary.is_isLeader()) {
 +                            return new NimbusClient(conf, nimbusSummary.get_host(), nimbusSummary.get_port(), null, asUser);
 +                        }
 +                    }
++                    throw new RuntimeException("Found nimbuses " + nimbuses + " none of which is elected as leader, please try " +
++                            "again after some time.");
                  }
-                 throw new RuntimeException("Found nimbuses " + nimbuses + " none of which is elected as leader, please try " +
-                         "again after some time.");
 -                asUser = (String) conf.get(Config.STORM_DO_AS_USER);
 +            } catch (Exception e) {
-                 LOG.warn("Ignoring exception while trying to get leader nimbus info from " + seed, e);
++                LOG.warn("Ignoring exception while trying to get leader nimbus info from " + seed
++                        + ". will retry with a different seed host.", e);
              }
 -            String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
 -            return new NimbusClient(conf, nimbusHost, null, null, asUser);
 -        } catch (TTransportException ex) {
 -            throw new RuntimeException(ex);
          }
-         throw new RuntimeException("Could not find leader nimbus from seed hosts " + seeds +". " +
++        throw new RuntimeException("Could not find leader nimbus from seed hosts " + seeds + ". " +
 +                "Did you specify a valid list of nimbus host:port for config " + Config.NIMBUS_SEEDS);
      }
  
      public NimbusClient(Map conf, String host, int port) throws TTransportException {
--        this(conf, host, port, null);
++        this(conf, host, port, null, null);
      }
  
      public NimbusClient(Map conf, String host, int port, Integer timeout) throws TTransportException {

http://git-wip-us.apache.org/repos/asf/storm/blob/765e4c2f/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
index 8908593,0a9c280..dff3042
--- a/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
@@@ -56,120 -56,127 +56,125 @@@
        (testing/kill-local-storm-cluster cluster-map#)
        (.stop nimbus-server#)))
  
 -(deftest Simple-authentication-test 
 +(deftest Simple-authentication-test
-   (with-test-cluster [6627 nil nil "backtype.storm.security.auth.SimpleTransportPlugin"]
-     (let [storm-conf (merge (read-storm-config)
-                             {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
-                              STORM-NIMBUS-RETRY-TIMES 0})
-           client (NimbusClient. storm-conf "localhost" 6627 nimbus-timeout)
-           nimbus_client (.getClient client)]
-       (testing "(Positive authorization) Simple protocol w/o authentication/authorization enforcement"
-                (is (thrown-cause? NotAliveException
-                             (.activate nimbus_client "topo-name"))))
-       (.close client))))
+   (let [port (available-port)]
+     (with-test-cluster [port nil nil "backtype.storm.security.auth.SimpleTransportPlugin"]
+       (let [storm-conf (merge (read-storm-config)
+                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
+                                STORM-NIMBUS-RETRY-TIMES 0})
+             client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
+             nimbus_client (.getClient client)]
+         (testing "(Positive authorization) Simple protocol w/o authentication/authorization enforcement"
+                  (is (thrown-cause? NotAliveException
+                               (.activate nimbus_client "topo-name"))))
+         (.close client)))))
 -  
 +
  (deftest test-noop-authorization-w-simple-transport
-   (with-test-cluster [6628 nil
-                 "backtype.storm.security.auth.authorizer.NoopAuthorizer"
-                 "backtype.storm.security.auth.SimpleTransportPlugin"]
-     (let [storm-conf (merge (read-storm-config)
-                              {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
-                               STORM-NIMBUS-RETRY-TIMES 0})
-           client (NimbusClient. storm-conf "localhost" 6628 nimbus-timeout)
-           nimbus_client (.getClient client)]
-       (testing "(Positive authorization) Authorization plugin should accept client request"
-                (is (thrown-cause? NotAliveException
-                             (.activate nimbus_client "topo-name"))))
-       (.close client))))
+   (let [port (available-port)]
+     (with-test-cluster [port nil
+                   "backtype.storm.security.auth.authorizer.NoopAuthorizer"
+                   "backtype.storm.security.auth.SimpleTransportPlugin"]
+       (let [storm-conf (merge (read-storm-config)
+                                {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
+                                 STORM-NIMBUS-RETRY-TIMES 0})
+             client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
+             nimbus_client (.getClient client)]
+         (testing "(Positive authorization) Authorization plugin should accept client request"
+                  (is (thrown-cause? NotAliveException
+                               (.activate nimbus_client "topo-name"))))
+         (.close client)))))
  
  (deftest test-deny-authorization-w-simple-transport
-   (with-test-cluster [6629 nil
-                 "backtype.storm.security.auth.authorizer.DenyAuthorizer"
-                 "backtype.storm.security.auth.SimpleTransportPlugin"]
-     (let [storm-conf (merge (read-storm-config)
-                              {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
-                              Config/NIMBUS_THRIFT_PORT 6629
-                              STORM-NIMBUS-RETRY-TIMES 0})
-           client (NimbusClient. storm-conf "localhost" 6629 nimbus-timeout)
-           nimbus_client (.getClient client)
-           topologyInitialStatus (TopologyInitialStatus/findByValue 2)
-           submitOptions (SubmitOptions. topologyInitialStatus)]
-       (is (thrown-cause? AuthorizationException (.submitTopology nimbus_client  "topo-name" nil nil nil)))
-       (is (thrown-cause? AuthorizationException (.submitTopologyWithOpts nimbus_client  "topo-name" nil nil nil submitOptions)))
-       (is (thrown-cause? AuthorizationException (.beginFileUpload nimbus_client)))
+   (let [port (available-port)]
+     (with-test-cluster [port nil
+                   "backtype.storm.security.auth.authorizer.DenyAuthorizer"
+                   "backtype.storm.security.auth.SimpleTransportPlugin"]
+       (let [storm-conf (merge (read-storm-config)
+                                {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
 -                               Config/NIMBUS_HOST "localhost"
+                                Config/NIMBUS_THRIFT_PORT port
+                                STORM-NIMBUS-RETRY-TIMES 0})
 -            client (NimbusClient/getConfiguredClient storm-conf)
++            client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
+             nimbus_client (.getClient client)
+             topologyInitialStatus (TopologyInitialStatus/findByValue 2)
+             submitOptions (SubmitOptions. topologyInitialStatus)]
+         (is (thrown-cause? AuthorizationException (.submitTopology nimbus_client  "topo-name" nil nil nil)))
+         (is (thrown-cause? AuthorizationException (.submitTopologyWithOpts nimbus_client  "topo-name" nil nil nil submitOptions)))
+         (is (thrown-cause? AuthorizationException (.beginFileUpload nimbus_client)))
 +
-       (is (thrown-cause? AuthorizationException (.uploadChunk nimbus_client nil nil)))
-       (is (thrown-cause? AuthorizationException (.finishFileUpload nimbus_client nil)))
-       (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus_client nil)))
-       (is (thrown-cause? AuthorizationException (.downloadChunk nimbus_client nil)))
-       (is (thrown-cause? AuthorizationException (.getNimbusConf nimbus_client)))
-       (is (thrown-cause? AuthorizationException (.getClusterInfo nimbus_client)))
-       (stubbing [nimbus/check-storm-active! nil
-                  nimbus/try-read-storm-conf-from-name {}]
-         (is (thrown-cause? AuthorizationException (.killTopology nimbus_client "topo-name")))
-         (is (thrown-cause? AuthorizationException (.killTopologyWithOpts nimbus_client "topo-name" (KillOptions.))))
-         (is (thrown-cause? AuthorizationException (.activate nimbus_client "topo-name")))
-         (is (thrown-cause? AuthorizationException (.deactivate nimbus_client "topo-name")))
-         (is (thrown-cause? AuthorizationException (.rebalance nimbus_client "topo-name" nil)))
-       )
-       (stubbing [nimbus/try-read-storm-conf {}]
-         (is (thrown-cause? AuthorizationException (.getTopologyConf nimbus_client "topo-ID")))
-         (is (thrown-cause? AuthorizationException (.getTopology nimbus_client "topo-ID")))
-         (is (thrown-cause? AuthorizationException (.getUserTopology nimbus_client "topo-ID")))
-         (is (thrown-cause? AuthorizationException (.getTopologyInfo nimbus_client "topo-ID"))))
-       (.close client))))
+         (is (thrown-cause? AuthorizationException (.uploadChunk nimbus_client nil nil)))
+         (is (thrown-cause? AuthorizationException (.finishFileUpload nimbus_client nil)))
+         (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus_client nil)))
+         (is (thrown-cause? AuthorizationException (.downloadChunk nimbus_client nil)))
+         (is (thrown-cause? AuthorizationException (.getNimbusConf nimbus_client)))
+         (is (thrown-cause? AuthorizationException (.getClusterInfo nimbus_client)))
+         (stubbing [nimbus/check-storm-active! nil
+                    nimbus/try-read-storm-conf-from-name {}]
+           (is (thrown-cause? AuthorizationException (.killTopology nimbus_client "topo-name")))
+           (is (thrown-cause? AuthorizationException (.killTopologyWithOpts nimbus_client "topo-name" (KillOptions.))))
+           (is (thrown-cause? AuthorizationException (.activate nimbus_client "topo-name")))
+           (is (thrown-cause? AuthorizationException (.deactivate nimbus_client "topo-name")))
+           (is (thrown-cause? AuthorizationException (.rebalance nimbus_client "topo-name" nil)))
+         )
+         (stubbing [nimbus/try-read-storm-conf {}]
+           (is (thrown-cause? AuthorizationException (.getTopologyConf nimbus_client "topo-ID")))
+           (is (thrown-cause? AuthorizationException (.getTopology nimbus_client "topo-ID")))
+           (is (thrown-cause? AuthorizationException (.getUserTopology nimbus_client "topo-ID")))
+           (is (thrown-cause? AuthorizationException (.getTopologyInfo nimbus_client "topo-ID"))))
+         (.close client)))))
  
  (deftest test-noop-authorization-w-sasl-digest
-   (with-test-cluster [6630
-                 "test/clj/backtype/storm/security/auth/jaas_digest.conf"
-                 "backtype.storm.security.auth.authorizer.NoopAuthorizer"
-                 "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"]
-     (let [storm-conf (merge (read-storm-config)
-                             {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
-                              "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
-                              Config/NIMBUS_THRIFT_PORT 6630
-                              STORM-NIMBUS-RETRY-TIMES 0})
-           client (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)
-           nimbus_client (.getClient client)]
-       (testing "(Positive authorization) Authorization plugin should accept client request"
-                (is (thrown-cause? NotAliveException
-                             (.activate nimbus_client "topo-name"))))
-       (.close client))))
+   (let [port (available-port)]
+     (with-test-cluster [port
+                   "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+                   "backtype.storm.security.auth.authorizer.NoopAuthorizer"
+                   "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"]
+       (let [storm-conf (merge (read-storm-config)
+                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+                                "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
 -                               Config/NIMBUS_HOST "localhost"
+                                Config/NIMBUS_THRIFT_PORT port
+                                STORM-NIMBUS-RETRY-TIMES 0})
 -            client (NimbusClient/getConfiguredClient storm-conf)
++            client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
+             nimbus_client (.getClient client)]
+         (testing "(Positive authorization) Authorization plugin should accept client request"
+                  (is (thrown-cause? NotAliveException
+                               (.activate nimbus_client "topo-name"))))
+         (.close client)))))
  
  (deftest test-deny-authorization-w-sasl-digest
-   (with-test-cluster [6631
-                 "test/clj/backtype/storm/security/auth/jaas_digest.conf"
-                 "backtype.storm.security.auth.authorizer.DenyAuthorizer"
-                 "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"]
-     (let [storm-conf (merge (read-storm-config)
-                             {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
-                              "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
-                              Config/NIMBUS_THRIFT_PORT 6631
-                              STORM-NIMBUS-RETRY-TIMES 0})
-           client (NimbusClient. storm-conf "localhost" 6631 nimbus-timeout)
-           nimbus_client (.getClient client)
-           topologyInitialStatus (TopologyInitialStatus/findByValue 2)
-           submitOptions (SubmitOptions. topologyInitialStatus)]
-       (is (thrown-cause? AuthorizationException (.submitTopology nimbus_client  "topo-name" nil nil nil)))
-       (is (thrown-cause? AuthorizationException (.submitTopologyWithOpts nimbus_client  "topo-name" nil nil nil submitOptions)))
-       (is (thrown-cause? AuthorizationException (.beginFileUpload nimbus_client)))
-       (is (thrown-cause? AuthorizationException (.uploadChunk nimbus_client nil nil)))
-       (is (thrown-cause? AuthorizationException (.finishFileUpload nimbus_client nil)))
-       (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus_client nil)))
-       (is (thrown-cause? AuthorizationException (.downloadChunk nimbus_client nil)))
-       (is (thrown-cause? AuthorizationException (.getNimbusConf nimbus_client)))
-       (is (thrown-cause? AuthorizationException (.getClusterInfo nimbus_client)))
-       (stubbing [nimbus/check-storm-active! nil
-                  nimbus/try-read-storm-conf-from-name {}]
-         (is (thrown-cause? AuthorizationException (.killTopology nimbus_client "topo-name")))
-         (is (thrown-cause? AuthorizationException (.killTopologyWithOpts nimbus_client "topo-name" (KillOptions.))))
-         (is (thrown-cause? AuthorizationException (.activate nimbus_client "topo-name")))
-         (is (thrown-cause? AuthorizationException (.deactivate nimbus_client "topo-name")))
-         (is (thrown-cause? AuthorizationException (.rebalance nimbus_client "topo-name" nil))))
-       (stubbing [nimbus/try-read-storm-conf {}]
-         (is (thrown-cause? AuthorizationException (.getTopologyConf nimbus_client "topo-ID")))
-         (is (thrown-cause? AuthorizationException (.getTopology nimbus_client "topo-ID")))
-         (is (thrown-cause? AuthorizationException (.getUserTopology nimbus_client "topo-ID")))
-         (is (thrown-cause? AuthorizationException (.getTopologyInfo nimbus_client "topo-ID"))))
-       (.close client))))
+   (let [port (available-port)]
+     (with-test-cluster [port
+                   "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+                   "backtype.storm.security.auth.authorizer.DenyAuthorizer"
+                   "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"]
+       (let [storm-conf (merge (read-storm-config)
+                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+                                "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
 -                               Config/NIMBUS_HOST "localhost"
+                                Config/NIMBUS_THRIFT_PORT port
+                                STORM-NIMBUS-RETRY-TIMES 0})
 -            client (NimbusClient/getConfiguredClient storm-conf)
++            client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
+             nimbus_client (.getClient client)
+             topologyInitialStatus (TopologyInitialStatus/findByValue 2)
+             submitOptions (SubmitOptions. topologyInitialStatus)]
+         (is (thrown-cause? AuthorizationException (.submitTopology nimbus_client  "topo-name" nil nil nil)))
+         (is (thrown-cause? AuthorizationException (.submitTopologyWithOpts nimbus_client  "topo-name" nil nil nil submitOptions)))
+         (is (thrown-cause? AuthorizationException (.beginFileUpload nimbus_client)))
+         (is (thrown-cause? AuthorizationException (.uploadChunk nimbus_client nil nil)))
+         (is (thrown-cause? AuthorizationException (.finishFileUpload nimbus_client nil)))
+         (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus_client nil)))
+         (is (thrown-cause? AuthorizationException (.downloadChunk nimbus_client nil)))
+         (is (thrown-cause? AuthorizationException (.getNimbusConf nimbus_client)))
+         (is (thrown-cause? AuthorizationException (.getClusterInfo nimbus_client)))
+         (stubbing [nimbus/check-storm-active! nil
+                    nimbus/try-read-storm-conf-from-name {}]
+           (is (thrown-cause? AuthorizationException (.killTopology nimbus_client "topo-name")))
+           (is (thrown-cause? AuthorizationException (.killTopologyWithOpts nimbus_client "topo-name" (KillOptions.))))
+           (is (thrown-cause? AuthorizationException (.activate nimbus_client "topo-name")))
+           (is (thrown-cause? AuthorizationException (.deactivate nimbus_client "topo-name")))
+           (is (thrown-cause? AuthorizationException (.rebalance nimbus_client "topo-name" nil))))
+         (stubbing [nimbus/try-read-storm-conf {}]
+           (is (thrown-cause? AuthorizationException (.getTopologyConf nimbus_client "topo-ID")))
+           (is (thrown-cause? AuthorizationException (.getTopology nimbus_client "topo-ID")))
+           (is (thrown-cause? AuthorizationException (.getUserTopology nimbus_client "topo-ID")))
+           (is (thrown-cause? AuthorizationException (.getTopologyInfo nimbus_client "topo-ID"))))
+         (.close client)))))