You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2015/04/01 19:50:16 UTC
[45/45] storm git commit: Merge remote-tracking branch
'apache/master' into nimbus-ha
Merge remote-tracking branch 'apache/master' into nimbus-ha
Conflicts:
storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/765e4c2f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/765e4c2f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/765e4c2f
Branch: refs/heads/nimbus-ha-branch
Commit: 765e4c2fabafbfe31fb0f43e5b670fd3b91d1a2e
Parents: a11fcc3 2aaa718
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Apr 1 10:44:56 2015 -0700
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Apr 1 10:44:56 2015 -0700
----------------------------------------------------------------------
CHANGELOG.md | 13 ++
README.markdown | 2 +
STORM-UI-REST-API.md | 30 +++
docs/_posts/2015-03-25-storm094-released.md | 24 ++
docs/downloads.html | 64 ++++--
examples/storm-starter/pom.xml | 2 +-
external/storm-hbase/pom.xml | 2 +-
.../org/apache/storm/hbase/bolt/HBaseBolt.java | 2 +-
.../storm/hbase/bolt/HBaseLookupBolt.java | 4 +-
.../hbase/trident/state/HBaseMapState.java | 2 +
.../storm/hbase/trident/state/HBaseState.java | 2 +-
external/storm-hdfs/pom.xml | 2 +-
.../org/apache/storm/hdfs/bolt/HdfsBolt.java | 2 +-
.../storm/hdfs/bolt/SequenceFileBolt.java | 2 +-
external/storm-hive/pom.xml | 2 +-
external/storm-jdbc/pom.xml | 2 +-
external/storm-kafka/pom.xml | 2 +-
.../src/jvm/storm/kafka/KafkaUtils.java | 16 +-
external/storm-redis/README.md | 108 ++++++++-
external/storm-redis/pom.xml | 2 +-
.../storm/redis/bolt/AbstractRedisBolt.java | 8 +-
.../storm/redis/bolt/RedisLookupBolt.java | 112 ++++++++++
.../apache/storm/redis/bolt/RedisStoreBolt.java | 100 +++++++++
.../redis/common/config/JedisClusterConfig.java | 82 +++++++
.../redis/common/config/JedisPoolConfig.java | 97 +++++++++
.../common/container/JedisClusterContainer.java | 47 ++++
.../JedisCommandsContainerBuilder.java | 38 ++++
.../JedisCommandsInstanceContainer.java | 25 +++
.../redis/common/container/JedisContainer.java | 65 ++++++
.../common/mapper/RedisDataTypeDescription.java | 50 +++++
.../redis/common/mapper/RedisLookupMapper.java | 40 ++++
.../storm/redis/common/mapper/RedisMapper.java | 22 ++
.../redis/common/mapper/RedisStoreMapper.java | 21 ++
.../storm/redis/common/mapper/TupleMapper.java | 27 +++
.../trident/mapper/TridentTupleMapper.java | 27 ---
.../trident/state/RedisClusterMapState.java | 2 +-
.../redis/trident/state/RedisClusterState.java | 2 +-
.../trident/state/RedisClusterStateQuerier.java | 10 +-
.../trident/state/RedisClusterStateUpdater.java | 10 +-
.../redis/trident/state/RedisMapState.java | 21 +-
.../storm/redis/trident/state/RedisState.java | 2 +-
.../redis/trident/state/RedisStateQuerier.java | 10 +-
.../state/RedisStateSetCountQuerier.java | 74 -------
.../trident/state/RedisStateSetUpdater.java | 80 -------
.../redis/trident/state/RedisStateUpdater.java | 10 +-
.../redis/util/config/JedisClusterConfig.java | 82 -------
.../redis/util/config/JedisPoolConfig.java | 97 ---------
.../util/container/JedisClusterContainer.java | 47 ----
.../JedisCommandsContainerBuilder.java | 38 ----
.../JedisCommandsInstanceContainer.java | 25 ---
.../redis/util/container/JedisContainer.java | 65 ------
.../storm/redis/topology/LookupWordCount.java | 115 ++++++----
.../redis/topology/PersistentWordCount.java | 81 ++++---
.../storm/redis/topology/WordCounter.java | 19 +-
.../redis/trident/WordCountTridentRedis.java | 7 +-
.../trident/WordCountTridentRedisCluster.java | 6 +-
.../WordCountTridentRedisClusterMap.java | 8 +-
.../redis/trident/WordCountTridentRedisMap.java | 9 +-
.../redis/trident/WordCountTupleMapper.java | 10 +-
pom.xml | 2 +-
.../maven-shade-clojure-transformer/pom.xml | 2 +-
storm-buildtools/storm-maven-plugins/pom.xml | 2 +-
storm-core/pom.xml | 2 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 62 +++++-
.../src/clj/backtype/storm/ui/helpers.clj | 15 +-
storm-core/src/jvm/backtype/storm/Config.java | 6 +
.../jvm/backtype/storm/utils/NimbusClient.java | 29 ++-
storm-core/src/ui/public/css/style.css | 62 ++++++
.../storm/security/auth/nimbus_auth_test.clj | 217 ++++++++++---------
storm-dist/binary/pom.xml | 2 +-
storm-dist/source/pom.xml | 2 +-
71 files changed, 1425 insertions(+), 854 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/765e4c2f/STORM-UI-REST-API.md
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/765e4c2f/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/765e4c2f/storm-core/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/765e4c2f/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/ui/core.clj
index 553434e,ab2b2d9..02c3d90
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@@ -16,10 -16,12 +16,12 @@@
(ns backtype.storm.ui.core
(:use compojure.core)
- (:use ring.middleware.reload)
+ (:use [clojure.java.shell :only [sh]])
+ (:use ring.middleware.reload
+ ring.middleware.multipart-params)
(:use [ring.middleware.json :only [wrap-json-params]])
(:use [hiccup core page-helpers])
- (:use [backtype.storm config util log])
+ (:use [backtype.storm config util log zookeeper])
(:use [backtype.storm.ui helpers])
(:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
ACKER-FAIL-STREAM-ID system-id? mk-authorization-handler]]])
@@@ -499,8 -505,41 +501,41 @@@
(hashmap-to-persistent bolts))
spout-comp-summs bolt-comp-summs window id))))
+ (defn validate-tplg-submit-params [params]
+ (let [tplg-jar-file (params :topologyJar)
+ tplg-config (if (not-nil? (params :topologyConfig)) (from-json (params :topologyConfig)))]
+ (cond
+ (nil? tplg-jar-file) {:valid false :error "missing topology jar file"}
+ (nil? tplg-config) {:valid false :error "missing topology config"}
+ (nil? (tplg-config "topologyMainClass")) {:valid false :error "topologyMainClass missing in topologyConfig"}
+ :else {:valid true})))
+
+ (defn run-tplg-submit-cmd [tplg-jar-file tplg-config user]
+ (let [tplg-main-class (if (not-nil? tplg-config) (trim (tplg-config "topologyMainClass")))
+ tplg-main-class-args (if (not-nil? tplg-config) (clojure.string/join " " (tplg-config "topologyMainClassArgs")))
+ tplg-jvm-opts (if (not-nil? tplg-config) (clojure.string/join " " (tplg-config "topologyJvmOpts")))
+ storm-home (System/getProperty "storm.home")
+ storm-conf-dir (str storm-home file-path-separator "conf")
+ storm-log-dir (if (not-nil? (*STORM-CONF* "storm.log.dir")) (*STORM-CONF* "storm.log.dir")
+ (str storm-home file-path-separator "logs"))
+ storm-libs (str storm-home file-path-separator "lib" file-path-separator "*")
+ java-cmd (str (System/getProperty "java.home") file-path-separator "bin" file-path-separator "java")
+ storm-cmd (str storm-home file-path-separator "bin" file-path-separator "storm")
+ tplg-cmd-response (sh storm-cmd "jar" tplg-jar-file
+ tplg-main-class
+ tplg-main-class-args
+ (if (not= user "unknown") (str "-c storm.doAsUser=" user) ""))]
+ (log-message "tplg-cmd-response " tplg-cmd-response)
+ (cond
+ (= (tplg-cmd-response :exit) 0) {"status" "success"}
+ (and (not= (tplg-cmd-response :exit) 0)
+ (not-nil? (re-find #"already exists on cluster" (tplg-cmd-response :err)))) {"status" "failed" "error" "Topology with the same name exists in cluster"}
+ (not= (tplg-cmd-response :exit) 0) {"status" "failed" "error" (clojure.string/trim-newline (tplg-cmd-response :err))}
+ :else {"status" "success" "response" "topology deployed"}
+ )))
+
(defn cluster-configuration []
- (with-nimbus nimbus
+ (thrift/with-configured-nimbus-connection nimbus
(.getNimbusConf ^Nimbus$Client nimbus)))
(defn cluster-summary
http://git-wip-us.apache.org/repos/asf/storm/blob/765e4c2f/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/765e4c2f/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
index 071d2b6,6314deb..08610e9
--- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
@@@ -46,34 -41,23 +46,43 @@@ public class NimbusClient extends Thrif
}
public static NimbusClient getConfiguredClientAs(Map conf, String asUser) {
- try {
- if(conf.containsKey(Config.STORM_DO_AS_USER)) {
- if(asUser != null && !asUser.isEmpty()) {
- LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence."
- , asUser, conf.get(Config.STORM_DO_AS_USER));
++ if (conf.containsKey(Config.STORM_DO_AS_USER)) {
++ if (asUser != null && !asUser.isEmpty()) {
++ LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence."
++ , asUser, conf.get(Config.STORM_DO_AS_USER));
++ }
++ asUser = (String) conf.get(Config.STORM_DO_AS_USER);
++ }
++
+ List<String> seeds = (List<String>) conf.get(Config.NIMBUS_SEEDS);
- for(String seed : seeds) {
++ for (String seed : seeds) {
+ String[] split = seed.split(":");
+ String host = split[0];
+ int port = Integer.parseInt(split[1]);
+ try {
- NimbusClient client = new NimbusClient(conf,host,port);
++ NimbusClient client = new NimbusClient(conf, host, port);
+ ClusterSummary clusterInfo = client.getClient().getClusterInfo();
+ List<NimbusSummary> nimbuses = clusterInfo.get_nimbuses();
- if(nimbuses != null) {
- for(NimbusSummary nimbusSummary : nimbuses) {
- if(nimbusSummary.is_isLeader()) {
++ if (nimbuses != null) {
++ for (NimbusSummary nimbusSummary : nimbuses) {
++ if (nimbusSummary.is_isLeader()) {
+ return new NimbusClient(conf, nimbusSummary.get_host(), nimbusSummary.get_port(), null, asUser);
+ }
+ }
++ throw new RuntimeException("Found nimbuses " + nimbuses + " none of which is elected as leader, please try " +
++ "again after some time.");
}
- throw new RuntimeException("Found nimbuses " + nimbuses + " none of which is elected as leader, please try " +
- "again after some time.");
- asUser = (String) conf.get(Config.STORM_DO_AS_USER);
+ } catch (Exception e) {
- LOG.warn("Ignoring exception while trying to get leader nimbus info from " + seed, e);
++ LOG.warn("Ignoring exception while trying to get leader nimbus info from " + seed
++ + ". will retry with a different seed host.", e);
}
- String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
- return new NimbusClient(conf, nimbusHost, null, null, asUser);
- } catch (TTransportException ex) {
- throw new RuntimeException(ex);
}
- throw new RuntimeException("Could not find leader nimbus from seed hosts " + seeds +". " +
++ throw new RuntimeException("Could not find leader nimbus from seed hosts " + seeds + ". " +
+ "Did you specify a valid list of nimbus host:port for config " + Config.NIMBUS_SEEDS);
}
public NimbusClient(Map conf, String host, int port) throws TTransportException {
-- this(conf, host, port, null);
++ this(conf, host, port, null, null);
}
public NimbusClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
http://git-wip-us.apache.org/repos/asf/storm/blob/765e4c2f/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
index 8908593,0a9c280..dff3042
--- a/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
@@@ -56,120 -56,127 +56,125 @@@
(testing/kill-local-storm-cluster cluster-map#)
(.stop nimbus-server#)))
-(deftest Simple-authentication-test
+(deftest Simple-authentication-test
- (with-test-cluster [6627 nil nil "backtype.storm.security.auth.SimpleTransportPlugin"]
- (let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
- STORM-NIMBUS-RETRY-TIMES 0})
- client (NimbusClient. storm-conf "localhost" 6627 nimbus-timeout)
- nimbus_client (.getClient client)]
- (testing "(Positive authorization) Simple protocol w/o authentication/authorization enforcement"
- (is (thrown-cause? NotAliveException
- (.activate nimbus_client "topo-name"))))
- (.close client))))
+ (let [port (available-port)]
+ (with-test-cluster [port nil nil "backtype.storm.security.auth.SimpleTransportPlugin"]
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
+ STORM-NIMBUS-RETRY-TIMES 0})
+ client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (testing "(Positive authorization) Simple protocol w/o authentication/authorization enforcement"
+ (is (thrown-cause? NotAliveException
+ (.activate nimbus_client "topo-name"))))
+ (.close client)))))
-
+
(deftest test-noop-authorization-w-simple-transport
- (with-test-cluster [6628 nil
- "backtype.storm.security.auth.authorizer.NoopAuthorizer"
- "backtype.storm.security.auth.SimpleTransportPlugin"]
- (let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
- STORM-NIMBUS-RETRY-TIMES 0})
- client (NimbusClient. storm-conf "localhost" 6628 nimbus-timeout)
- nimbus_client (.getClient client)]
- (testing "(Positive authorization) Authorization plugin should accept client request"
- (is (thrown-cause? NotAliveException
- (.activate nimbus_client "topo-name"))))
- (.close client))))
+ (let [port (available-port)]
+ (with-test-cluster [port nil
+ "backtype.storm.security.auth.authorizer.NoopAuthorizer"
+ "backtype.storm.security.auth.SimpleTransportPlugin"]
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
+ STORM-NIMBUS-RETRY-TIMES 0})
+ client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (testing "(Positive authorization) Authorization plugin should accept client request"
+ (is (thrown-cause? NotAliveException
+ (.activate nimbus_client "topo-name"))))
+ (.close client)))))
(deftest test-deny-authorization-w-simple-transport
- (with-test-cluster [6629 nil
- "backtype.storm.security.auth.authorizer.DenyAuthorizer"
- "backtype.storm.security.auth.SimpleTransportPlugin"]
- (let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
- Config/NIMBUS_THRIFT_PORT 6629
- STORM-NIMBUS-RETRY-TIMES 0})
- client (NimbusClient. storm-conf "localhost" 6629 nimbus-timeout)
- nimbus_client (.getClient client)
- topologyInitialStatus (TopologyInitialStatus/findByValue 2)
- submitOptions (SubmitOptions. topologyInitialStatus)]
- (is (thrown-cause? AuthorizationException (.submitTopology nimbus_client "topo-name" nil nil nil)))
- (is (thrown-cause? AuthorizationException (.submitTopologyWithOpts nimbus_client "topo-name" nil nil nil submitOptions)))
- (is (thrown-cause? AuthorizationException (.beginFileUpload nimbus_client)))
+ (let [port (available-port)]
+ (with-test-cluster [port nil
+ "backtype.storm.security.auth.authorizer.DenyAuthorizer"
+ "backtype.storm.security.auth.SimpleTransportPlugin"]
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
- Config/NIMBUS_HOST "localhost"
+ Config/NIMBUS_THRIFT_PORT port
+ STORM-NIMBUS-RETRY-TIMES 0})
- client (NimbusClient/getConfiguredClient storm-conf)
++ client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
+ nimbus_client (.getClient client)
+ topologyInitialStatus (TopologyInitialStatus/findByValue 2)
+ submitOptions (SubmitOptions. topologyInitialStatus)]
+ (is (thrown-cause? AuthorizationException (.submitTopology nimbus_client "topo-name" nil nil nil)))
+ (is (thrown-cause? AuthorizationException (.submitTopologyWithOpts nimbus_client "topo-name" nil nil nil submitOptions)))
+ (is (thrown-cause? AuthorizationException (.beginFileUpload nimbus_client)))
+
- (is (thrown-cause? AuthorizationException (.uploadChunk nimbus_client nil nil)))
- (is (thrown-cause? AuthorizationException (.finishFileUpload nimbus_client nil)))
- (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus_client nil)))
- (is (thrown-cause? AuthorizationException (.downloadChunk nimbus_client nil)))
- (is (thrown-cause? AuthorizationException (.getNimbusConf nimbus_client)))
- (is (thrown-cause? AuthorizationException (.getClusterInfo nimbus_client)))
- (stubbing [nimbus/check-storm-active! nil
- nimbus/try-read-storm-conf-from-name {}]
- (is (thrown-cause? AuthorizationException (.killTopology nimbus_client "topo-name")))
- (is (thrown-cause? AuthorizationException (.killTopologyWithOpts nimbus_client "topo-name" (KillOptions.))))
- (is (thrown-cause? AuthorizationException (.activate nimbus_client "topo-name")))
- (is (thrown-cause? AuthorizationException (.deactivate nimbus_client "topo-name")))
- (is (thrown-cause? AuthorizationException (.rebalance nimbus_client "topo-name" nil)))
- )
- (stubbing [nimbus/try-read-storm-conf {}]
- (is (thrown-cause? AuthorizationException (.getTopologyConf nimbus_client "topo-ID")))
- (is (thrown-cause? AuthorizationException (.getTopology nimbus_client "topo-ID")))
- (is (thrown-cause? AuthorizationException (.getUserTopology nimbus_client "topo-ID")))
- (is (thrown-cause? AuthorizationException (.getTopologyInfo nimbus_client "topo-ID"))))
- (.close client))))
+ (is (thrown-cause? AuthorizationException (.uploadChunk nimbus_client nil nil)))
+ (is (thrown-cause? AuthorizationException (.finishFileUpload nimbus_client nil)))
+ (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus_client nil)))
+ (is (thrown-cause? AuthorizationException (.downloadChunk nimbus_client nil)))
+ (is (thrown-cause? AuthorizationException (.getNimbusConf nimbus_client)))
+ (is (thrown-cause? AuthorizationException (.getClusterInfo nimbus_client)))
+ (stubbing [nimbus/check-storm-active! nil
+ nimbus/try-read-storm-conf-from-name {}]
+ (is (thrown-cause? AuthorizationException (.killTopology nimbus_client "topo-name")))
+ (is (thrown-cause? AuthorizationException (.killTopologyWithOpts nimbus_client "topo-name" (KillOptions.))))
+ (is (thrown-cause? AuthorizationException (.activate nimbus_client "topo-name")))
+ (is (thrown-cause? AuthorizationException (.deactivate nimbus_client "topo-name")))
+ (is (thrown-cause? AuthorizationException (.rebalance nimbus_client "topo-name" nil)))
+ )
+ (stubbing [nimbus/try-read-storm-conf {}]
+ (is (thrown-cause? AuthorizationException (.getTopologyConf nimbus_client "topo-ID")))
+ (is (thrown-cause? AuthorizationException (.getTopology nimbus_client "topo-ID")))
+ (is (thrown-cause? AuthorizationException (.getUserTopology nimbus_client "topo-ID")))
+ (is (thrown-cause? AuthorizationException (.getTopologyInfo nimbus_client "topo-ID"))))
+ (.close client)))))
(deftest test-noop-authorization-w-sasl-digest
- (with-test-cluster [6630
- "test/clj/backtype/storm/security/auth/jaas_digest.conf"
- "backtype.storm.security.auth.authorizer.NoopAuthorizer"
- "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"]
- (let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
- Config/NIMBUS_THRIFT_PORT 6630
- STORM-NIMBUS-RETRY-TIMES 0})
- client (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)
- nimbus_client (.getClient client)]
- (testing "(Positive authorization) Authorization plugin should accept client request"
- (is (thrown-cause? NotAliveException
- (.activate nimbus_client "topo-name"))))
- (.close client))))
+ (let [port (available-port)]
+ (with-test-cluster [port
+ "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+ "backtype.storm.security.auth.authorizer.NoopAuthorizer"
+ "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"]
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
- Config/NIMBUS_HOST "localhost"
+ Config/NIMBUS_THRIFT_PORT port
+ STORM-NIMBUS-RETRY-TIMES 0})
- client (NimbusClient/getConfiguredClient storm-conf)
++ client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (testing "(Positive authorization) Authorization plugin should accept client request"
+ (is (thrown-cause? NotAliveException
+ (.activate nimbus_client "topo-name"))))
+ (.close client)))))
(deftest test-deny-authorization-w-sasl-digest
- (with-test-cluster [6631
- "test/clj/backtype/storm/security/auth/jaas_digest.conf"
- "backtype.storm.security.auth.authorizer.DenyAuthorizer"
- "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"]
- (let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
- Config/NIMBUS_THRIFT_PORT 6631
- STORM-NIMBUS-RETRY-TIMES 0})
- client (NimbusClient. storm-conf "localhost" 6631 nimbus-timeout)
- nimbus_client (.getClient client)
- topologyInitialStatus (TopologyInitialStatus/findByValue 2)
- submitOptions (SubmitOptions. topologyInitialStatus)]
- (is (thrown-cause? AuthorizationException (.submitTopology nimbus_client "topo-name" nil nil nil)))
- (is (thrown-cause? AuthorizationException (.submitTopologyWithOpts nimbus_client "topo-name" nil nil nil submitOptions)))
- (is (thrown-cause? AuthorizationException (.beginFileUpload nimbus_client)))
- (is (thrown-cause? AuthorizationException (.uploadChunk nimbus_client nil nil)))
- (is (thrown-cause? AuthorizationException (.finishFileUpload nimbus_client nil)))
- (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus_client nil)))
- (is (thrown-cause? AuthorizationException (.downloadChunk nimbus_client nil)))
- (is (thrown-cause? AuthorizationException (.getNimbusConf nimbus_client)))
- (is (thrown-cause? AuthorizationException (.getClusterInfo nimbus_client)))
- (stubbing [nimbus/check-storm-active! nil
- nimbus/try-read-storm-conf-from-name {}]
- (is (thrown-cause? AuthorizationException (.killTopology nimbus_client "topo-name")))
- (is (thrown-cause? AuthorizationException (.killTopologyWithOpts nimbus_client "topo-name" (KillOptions.))))
- (is (thrown-cause? AuthorizationException (.activate nimbus_client "topo-name")))
- (is (thrown-cause? AuthorizationException (.deactivate nimbus_client "topo-name")))
- (is (thrown-cause? AuthorizationException (.rebalance nimbus_client "topo-name" nil))))
- (stubbing [nimbus/try-read-storm-conf {}]
- (is (thrown-cause? AuthorizationException (.getTopologyConf nimbus_client "topo-ID")))
- (is (thrown-cause? AuthorizationException (.getTopology nimbus_client "topo-ID")))
- (is (thrown-cause? AuthorizationException (.getUserTopology nimbus_client "topo-ID")))
- (is (thrown-cause? AuthorizationException (.getTopologyInfo nimbus_client "topo-ID"))))
- (.close client))))
+ (let [port (available-port)]
+ (with-test-cluster [port
+ "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+ "backtype.storm.security.auth.authorizer.DenyAuthorizer"
+ "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"]
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
- Config/NIMBUS_HOST "localhost"
+ Config/NIMBUS_THRIFT_PORT port
+ STORM-NIMBUS-RETRY-TIMES 0})
- client (NimbusClient/getConfiguredClient storm-conf)
++ client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
+ nimbus_client (.getClient client)
+ topologyInitialStatus (TopologyInitialStatus/findByValue 2)
+ submitOptions (SubmitOptions. topologyInitialStatus)]
+ (is (thrown-cause? AuthorizationException (.submitTopology nimbus_client "topo-name" nil nil nil)))
+ (is (thrown-cause? AuthorizationException (.submitTopologyWithOpts nimbus_client "topo-name" nil nil nil submitOptions)))
+ (is (thrown-cause? AuthorizationException (.beginFileUpload nimbus_client)))
+ (is (thrown-cause? AuthorizationException (.uploadChunk nimbus_client nil nil)))
+ (is (thrown-cause? AuthorizationException (.finishFileUpload nimbus_client nil)))
+ (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus_client nil)))
+ (is (thrown-cause? AuthorizationException (.downloadChunk nimbus_client nil)))
+ (is (thrown-cause? AuthorizationException (.getNimbusConf nimbus_client)))
+ (is (thrown-cause? AuthorizationException (.getClusterInfo nimbus_client)))
+ (stubbing [nimbus/check-storm-active! nil
+ nimbus/try-read-storm-conf-from-name {}]
+ (is (thrown-cause? AuthorizationException (.killTopology nimbus_client "topo-name")))
+ (is (thrown-cause? AuthorizationException (.killTopologyWithOpts nimbus_client "topo-name" (KillOptions.))))
+ (is (thrown-cause? AuthorizationException (.activate nimbus_client "topo-name")))
+ (is (thrown-cause? AuthorizationException (.deactivate nimbus_client "topo-name")))
+ (is (thrown-cause? AuthorizationException (.rebalance nimbus_client "topo-name" nil))))
+ (stubbing [nimbus/try-read-storm-conf {}]
+ (is (thrown-cause? AuthorizationException (.getTopologyConf nimbus_client "topo-ID")))
+ (is (thrown-cause? AuthorizationException (.getTopology nimbus_client "topo-ID")))
+ (is (thrown-cause? AuthorizationException (.getUserTopology nimbus_client "topo-ID")))
+ (is (thrown-cause? AuthorizationException (.getTopologyInfo nimbus_client "topo-ID"))))
+ (.close client)))))