You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/21 18:06:57 UTC
[02/14] STORM-216: Added Authentication and Authorization.
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj b/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
index c7d7411..6fdd485 100644
--- a/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
@@ -19,36 +19,40 @@
(:import [org.apache.thrift TException])
(:import [org.apache.thrift.transport TTransportException])
(:import [java.nio ByteBuffer])
+ (:import [java.security Principal AccessController])
+ (:import [javax.security.auth Subject])
+ (:import [java.net InetAddress])
(:import [backtype.storm Config])
+ (:import [backtype.storm.generated AuthorizationException])
(:import [backtype.storm.utils NimbusClient])
+ (:import [backtype.storm.security.auth.authorizer SimpleWhitelistAuthorizer SimpleACLAuthorizer])
(:import [backtype.storm.security.auth AuthUtils ThriftServer ThriftClient
- ReqContext])
+ ReqContext SimpleTransportPlugin KerberosPrincipalToLocal ThriftConnectionType])
(:use [backtype.storm bootstrap util])
(:use [backtype.storm.daemon common])
(:use [backtype.storm bootstrap testing])
- (:import [backtype.storm.generated Nimbus Nimbus$Client])
- )
+ (:import [backtype.storm.generated Nimbus Nimbus$Client]))
(bootstrap)
-(def nimbus-timeout (Integer. 120))
+(defn mk-principal [name]
+ (reify Principal
+ (equals [this other]
+ (= name (.getName other)))
+ (getName [this] name)
+ (toString [this] name)
+ (hashCode [this] (.hashCode name))))
+
+(defn mk-subject [name]
+ (Subject. true #{(mk-principal name)} #{} #{}))
-(defn mk-authorization-handler [storm-conf]
- (let [klassname (storm-conf NIMBUS-AUTHORIZER)
- aznClass (if klassname (Class/forName klassname))
- aznHandler (if aznClass (.newInstance aznClass))]
- (if aznHandler (.prepare aznHandler storm-conf))
- (log-debug "authorization class name:" klassname
- " class:" aznClass
- " handler:" aznHandler)
- aznHandler
- ))
+(def nimbus-timeout (Integer. 120))
(defn nimbus-data [storm-conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf storm-conf
:inimbus inimbus
- :authorization-handler (mk-authorization-handler storm-conf)
+ :authorization-handler (mk-authorization-handler (storm-conf NIMBUS-AUTHORIZER) storm-conf)
:submitted-count (atom 0)
:storm-cluster-state nil
:submit-lock (Object.)
@@ -61,71 +65,75 @@
:scheduler nil
}))
-(defn check-authorization! [nimbus storm-name storm-conf operation]
- (let [aclHandler (:authorization-handler nimbus)]
- (log-debug "check-authorization with handler: " aclHandler)
- (if aclHandler
- (if-not (.permit aclHandler
- (ReqContext/context)
- operation
- (if storm-conf storm-conf {TOPOLOGY-NAME storm-name}))
- (throw (RuntimeException. (str operation " on topology " storm-name " is not authorized")))
- ))))
-
-(defn dummy-service-handler [conf inimbus]
- (let [nimbus (nimbus-data conf inimbus)]
- (reify Nimbus$Iface
- (^void submitTopologyWithOpts [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
- ^SubmitOptions submitOptions]
- (check-authorization! nimbus storm-name nil "submitTopology"))
-
- (^void killTopology [this ^String storm-name]
- (check-authorization! nimbus storm-name nil "killTopology"))
-
- (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
- (check-authorization! nimbus storm-name nil "killTopology"))
-
- (^void rebalance [this ^String storm-name ^RebalanceOptions options]
- (check-authorization! nimbus storm-name nil "rebalance"))
-
- (activate [this storm-name]
- (check-authorization! nimbus storm-name nil "activate"))
-
- (deactivate [this storm-name]
- (check-authorization! nimbus storm-name nil "deactivate"))
-
- (beginFileUpload [this])
-
- (^void uploadChunk [this ^String location ^ByteBuffer chunk])
-
- (^void finishFileUpload [this ^String location])
-
- (^String beginFileDownload [this ^String file])
-
- (^ByteBuffer downloadChunk [this ^String id])
-
- (^String getNimbusConf [this])
+(defn dummy-service-handler
+ ([conf inimbus auth-context]
+ (let [nimbus-d (nimbus-data conf inimbus)
+ topo-conf (atom nil)]
+ (reify Nimbus$Iface
+ (^void submitTopologyWithOpts [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
+ ^SubmitOptions submitOptions]
+ (if (not (nil? serializedConf)) (swap! topo-conf (fn [prev new] new) (from-json serializedConf)))
+ (nimbus/check-authorization! nimbus-d storm-name @topo-conf "submitTopology" auth-context))
+
+ (^void killTopology [this ^String storm-name]
+ (nimbus/check-authorization! nimbus-d storm-name @topo-conf "killTopology" auth-context))
+
+ (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
+ (nimbus/check-authorization! nimbus-d storm-name @topo-conf "killTopology" auth-context))
+
+ (^void rebalance [this ^String storm-name ^RebalanceOptions options]
+ (nimbus/check-authorization! nimbus-d storm-name @topo-conf "rebalance" auth-context))
+
+ (activate [this storm-name]
+ (nimbus/check-authorization! nimbus-d storm-name @topo-conf "activate" auth-context))
+
+ (deactivate [this storm-name]
+ (nimbus/check-authorization! nimbus-d storm-name @topo-conf "deactivate" auth-context))
- (^String getTopologyConf [this ^String id])
+ (uploadNewCredentials [this storm-name creds]
+ (nimbus/check-authorization! nimbus-d storm-name @topo-conf "uploadNewCredentials" auth-context))
+
+ (beginFileUpload [this])
+
+ (^void uploadChunk [this ^String location ^ByteBuffer chunk])
+
+ (^void finishFileUpload [this ^String location])
+
+ (^String beginFileDownload [this ^String file]
+ (nimbus/check-authorization! nimbus-d nil nil "fileDownload" auth-context)
+ "Done!")
+
+ (^ByteBuffer downloadChunk [this ^String id])
+
+ (^String getNimbusConf [this])
+
+ (^String getTopologyConf [this ^String id])
+
+ (^StormTopology getTopology [this ^String id])
+
+ (^StormTopology getUserTopology [this ^String id])
+
+ (^ClusterSummary getClusterInfo [this])
+
+ (^TopologyInfo getTopologyInfo [this ^String storm-id]))))
+ ([conf inimbus]
+ (dummy-service-handler conf inimbus nil)))
+
- (^StormTopology getTopology [this ^String id])
-
- (^StormTopology getUserTopology [this ^String id])
-
- (^ClusterSummary getClusterInfo [this])
-
- (^TopologyInfo getTopologyInfo [this ^String storm-id]))))
-
-(defn launch-server [server-port login-cfg aznClass transportPluginClass]
+(defn launch-server [server-port login-cfg aznClass transportPluginClass serverConf]
(let [conf1 (merge (read-storm-config)
- {NIMBUS-AUTHORIZER aznClass
- NIMBUS-HOST "localhost"
- NIMBUS-THRIFT-PORT server-port
- STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass})
- conf (if login-cfg (merge conf1 {"java.security.auth.login.config" login-cfg}) conf1)
+ {NIMBUS-AUTHORIZER aznClass
+ NIMBUS-HOST "localhost"
+ NIMBUS-THRIFT-PORT server-port
+ STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass})
+ conf2 (if login-cfg (merge conf1 {"java.security.auth.login.config" login-cfg}) conf1)
+ conf (if serverConf (merge conf2 serverConf) conf2)
nimbus (nimbus/standalone-nimbus)
service-handler (dummy-service-handler conf nimbus)
- server (ThriftServer. conf (Nimbus$Processor. service-handler) (int (conf NIMBUS-THRIFT-PORT)))]
+ server (ThriftServer.
+ conf
+ (Nimbus$Processor. service-handler)
+ ThriftConnectionType/NIMBUS)]
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server))))
(.start (Thread. #(.serve server)))
(wait-for-condition #(.isServing server))
@@ -133,32 +141,172 @@
(defmacro with-server [args & body]
`(let [server# (launch-server ~@args)]
- ~@body
- (.stop server#)
- ))
+ ~@body
+ (.stop server#)
+ ))
-(deftest Simple-authentication-test
+(deftest kerb-to-local-test
+ (let [kptol (KerberosPrincipalToLocal. )]
+ (.prepare kptol {})
+ (is (= "me" (.toLocal kptol (mk-principal "me@realm"))))
+ (is (= "simple" (.toLocal kptol (mk-principal "simple"))))
+ (is (= "someone" (.toLocal kptol (mk-principal "someone/host@realm"))))))
+
+(deftest Simple-authentication-test
(let [a-port (available-port)]
- (with-server [a-port nil nil "backtype.storm.security.auth.SimpleTransportPlugin"]
+ (with-server [a-port nil nil "backtype.storm.security.auth.SimpleTransportPlugin" nil]
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
nimbus_client (.getClient client)]
(.activate nimbus_client "security_auth_test_topology")
(.close client))
-
+
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"})]
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+ STORM-NIMBUS-RETRY-TIMES 0})]
(testing "(Negative authentication) Server: Simple vs. Client: Digest"
(is (thrown-cause? java.net.SocketTimeoutException
(NimbusClient. storm-conf "localhost" a-port nimbus-timeout))))))))
+(deftest negative-whitelist-authorization-test
+ (let [a-port (available-port)]
+ (with-server [a-port nil
+ "backtype.storm.security.auth.authorizer.SimpleWhitelistAuthorizer"
+ "backtype.storm.testing.SingleUserSimpleTransport" nil]
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.testing.SingleUserSimpleTransport"})
+ client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (testing "(Negative authorization) Authorization plugin should reject client request"
+ (is (thrown-cause? AuthorizationException
+ (.activate nimbus_client "security_auth_test_topology"))))
+ (.close client)))))
+
+(deftest positive-whitelist-authorization-test
+ (let [a-port (available-port)]
+ (with-server [a-port nil
+ "backtype.storm.security.auth.authorizer.SimpleWhitelistAuthorizer"
+ "backtype.storm.testing.SingleUserSimpleTransport" {SimpleWhitelistAuthorizer/WHITELIST_USERS_CONF ["user"]}]
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.testing.SingleUserSimpleTransport"})
+ client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (testing "(Positive authorization) Authorization plugin should accept client request"
+ (.activate nimbus_client "security_auth_test_topology"))
+ (.close client)))))
+
+(deftest simple-acl-user-auth-test
+ (let [cluster-conf (merge (read-storm-config)
+ {NIMBUS-ADMINS ["admin"]
+ NIMBUS-SUPERVISOR-USERS ["supervisor"]})
+ authorizer (SimpleACLAuthorizer. )
+ admin-user (mk-subject "admin")
+ supervisor-user (mk-subject "supervisor")
+ user-a (mk-subject "user-a")
+ user-b (mk-subject "user-b")]
+ (.prepare authorizer cluster-conf)
+ (is (= true (.permit authorizer (ReqContext. user-a) "submitTopology" {})))
+ (is (= true (.permit authorizer (ReqContext. user-b) "submitTopology" {})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "submitTopology" {})))
+ (is (= false (.permit authorizer (ReqContext. supervisor-user) "submitTopology" {})))
+
+ (is (= true (.permit authorizer (ReqContext. user-a) "fileUpload" nil)))
+ (is (= true (.permit authorizer (ReqContext. user-b) "fileUpload" nil)))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil)))
+ (is (= false (.permit authorizer (ReqContext. supervisor-user) "fileUpload" nil)))
+
+ (is (= true (.permit authorizer (ReqContext. user-a) "getNimbusConf" nil)))
+ (is (= true (.permit authorizer (ReqContext. user-b) "getNimbusConf" nil)))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "getNimbusConf" nil)))
+ (is (= false (.permit authorizer (ReqContext. supervisor-user) "getNimbusConf" nil)))
+
+ (is (= true (.permit authorizer (ReqContext. user-a) "getClusterInfo" nil)))
+ (is (= true (.permit authorizer (ReqContext. user-b) "getClusterInfo" nil)))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "getClusterInfo" nil)))
+ (is (= false (.permit authorizer (ReqContext. supervisor-user) "getClusterInfo" nil)))
+
+ (is (= false (.permit authorizer (ReqContext. user-a) "fileDownload" nil)))
+ (is (= false (.permit authorizer (ReqContext. user-b) "fileDownload" nil)))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "fileDownload" nil)))
+ (is (= true (.permit authorizer (ReqContext. supervisor-user) "fileDownload" nil)))
+
+ (is (= true (.permit authorizer (ReqContext. user-a) "killTopology" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. user-b) "killTopology" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "killTopology" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. supervisor-user) "killTopolgy" {TOPOLOGY-USERS ["user-a"]})))
+
+ (is (= true (.permit authorizer (ReqContext. user-a) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. user-b) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. supervisor-user) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
+
+ (is (= true (.permit authorizer (ReqContext. user-a) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. user-b) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. supervisor-user) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
+
+ (is (= true (.permit authorizer (ReqContext. user-a) "activate" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. user-b) "activate" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "activate" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. supervisor-user) "activate" {TOPOLOGY-USERS ["user-a"]})))
+
+ (is (= true (.permit authorizer (ReqContext. user-a) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. user-b) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. supervisor-user) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
+
+ (is (= true (.permit authorizer (ReqContext. user-a) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. user-b) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
+
+ (is (= true (.permit authorizer (ReqContext. user-a) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. user-b) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
+
+ (is (= true (.permit authorizer (ReqContext. user-a) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. user-b) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. supervisor-user) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
+
+ (is (= true (.permit authorizer (ReqContext. user-a) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. user-b) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
+))
+
+(deftest simple-acl-same-user-auth-test
+ (let [cluster-conf (merge (read-storm-config)
+ {NIMBUS-ADMINS ["admin"]
+ NIMBUS-SUPERVISOR-USERS ["admin"]})
+ authorizer (SimpleACLAuthorizer. )
+ admin-user (mk-subject "admin")]
+ (.prepare authorizer cluster-conf)
+ (is (= true (.permit authorizer (ReqContext. admin-user) "submitTopology" {})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil)))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "getNimbusConf" nil)))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "getClusterInfo" nil)))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "fileDownload" nil)))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "killTopology" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "activate" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
+ (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
+))
+
+
(deftest positive-authorization-test
(let [a-port (available-port)]
- (with-server [a-port nil
+ (with-server [a-port nil
"backtype.storm.security.auth.authorizer.NoopAuthorizer"
- "backtype.storm.security.auth.SimpleTransportPlugin"]
+ "backtype.storm.security.auth.SimpleTransportPlugin" nil]
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
@@ -171,7 +319,7 @@
(let [a-port (available-port)]
(with-server [a-port nil
"backtype.storm.security.auth.authorizer.DenyAuthorizer"
- "backtype.storm.security.auth.SimpleTransportPlugin"]
+ "backtype.storm.security.auth.SimpleTransportPlugin" nil]
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
Config/NIMBUS_HOST "localhost"
@@ -180,19 +328,20 @@
client (NimbusClient/getConfiguredClient storm-conf)
nimbus_client (.getClient client)]
(testing "(Negative authorization) Authorization plugin should reject client request"
- (is (thrown? TTransportException
- (.activate nimbus_client "security_auth_test_topology"))))
+ (is (thrown-cause? AuthorizationException
+ (.activate nimbus_client "security_auth_test_topology"))))
(.close client)))))
(deftest digest-authentication-test
(let [a-port (available-port)]
- (with-server [a-port
+ (with-server [a-port
"test/clj/backtype/storm/security/auth/jaas_digest.conf"
nil
- "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"]
+ "backtype.storm.security.auth.digest.DigestSaslTransportPlugin" nil]
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"})
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+ STORM-NIMBUS-RETRY-TIMES 0})
client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
nimbus_client (.getClient client)]
(testing "(Positive authentication) valid digest authentication"
@@ -200,7 +349,8 @@
(.close client))
(let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
+ STORM-NIMBUS-RETRY-TIMES 0})
client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
nimbus_client (.getClient client)]
(testing "(Negative authentication) Server: Digest vs. Client: Simple"
@@ -210,34 +360,37 @@
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_bad_password.conf"})]
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_bad_password.conf"
+ STORM-NIMBUS-RETRY-TIMES 0})]
(testing "(Negative authentication) Invalid password"
- (is (thrown? TTransportException
- (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
+ (is (thrown-cause? TTransportException
+ (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_unknown_user.conf"})]
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_unknown_user.conf"
+ STORM-NIMBUS-RETRY-TIMES 0})]
(testing "(Negative authentication) Unknown user"
- (is (thrown? TTransportException
- (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
-
+ (is (thrown-cause? TTransportException
+ (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
+
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/nonexistent.conf"})]
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/nonexistent.conf"
+ STORM-NIMBUS-RETRY-TIMES 0})]
(testing "(Negative authentication) nonexistent configuration file"
- (is (thrown? RuntimeException
- (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
+ (is (thrown-cause? RuntimeException
+ (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_missing_client.conf"})]
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_missing_client.conf"
+ STORM-NIMBUS-RETRY-TIMES 0})]
(testing "(Negative authentication) Missing client"
(is (thrown-cause? java.io.IOException
(NimbusClient. storm-conf "localhost" a-port nimbus-timeout))))))))
-
-
+
(deftest test-GetTransportPlugin-throws-RuntimeException
(let [conf (merge (read-storm-config)
{Config/STORM_THRIFT_TRANSPORT_PLUGIN "null.invalid"})]
- (is (thrown? RuntimeException (AuthUtils/GetTransportPlugin conf nil)))))
+ (is (thrown-cause? RuntimeException (AuthUtils/GetTransportPlugin conf nil nil)))))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/authorizer/DRPCSimpleAclAuthorizer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/authorizer/DRPCSimpleAclAuthorizer_test.clj b/storm-core/test/clj/backtype/storm/security/auth/authorizer/DRPCSimpleAclAuthorizer_test.clj
new file mode 100644
index 0000000..d592a40
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/authorizer/DRPCSimpleAclAuthorizer_test.clj
@@ -0,0 +1,226 @@
+(ns backtype.storm.security.auth.authorizer.DRPCSimpleAclAuthorizer-test
+ (:use [clojure test])
+ (:import [org.mockito Mockito])
+ (:import [backtype.storm Config])
+ (:import [backtype.storm.security.auth ReqContext SingleUserPrincipal])
+ (:import [backtype.storm.security.auth.authorizer DRPCSimpleACLAuthorizer])
+ (:use [backtype.storm config util])
+ )
+
+(defn- mk-mock-context [user]
+ (let [mock-context (Mockito/mock ReqContext)]
+ (. (Mockito/when (.principal mock-context)) thenReturn
+ (SingleUserPrincipal. user))
+ mock-context))
+
+(let [function "jump"
+ partial-function "partial"
+ alice-context (mk-mock-context "alice")
+ alice-kerb-context (mk-mock-context "alice@SOME.RELM")
+ bob-context (mk-mock-context "bob")
+ charlie-context (mk-mock-context "charlie")
+ acl-file "drpc-simple-acl-test-scenario.yaml"
+ strict-handler (doto (DRPCSimpleACLAuthorizer.)
+ (.prepare {DRPC-AUTHORIZER-ACL-STRICT true
+ DRPC-AUTHORIZER-ACL-FILENAME acl-file
+ STORM-PRINCIPAL-TO-LOCAL-PLUGIN "backtype.storm.security.auth.KerberosPrincipalToLocal"}))
+ permissive-handler (doto (DRPCSimpleACLAuthorizer.)
+ (.prepare {DRPC-AUTHORIZER-ACL-STRICT false
+ DRPC-AUTHORIZER-ACL-FILENAME acl-file
+ STORM-PRINCIPAL-TO-LOCAL-PLUGIN "backtype.storm.security.auth.KerberosPrincipalToLocal"}))]
+
+ (deftest test-partial-authorization
+ (testing "deny execute to unauthorized user"
+ (is (not
+ (.permit strict-handler
+ (ReqContext/context)
+ "execute"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY partial-function}))))
+
+ (testing "allow execute to authorized kerb user for correct function"
+ (is
+ (.permit
+ strict-handler
+ alice-kerb-context
+ "execute"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY partial-function})))
+
+ (testing "deny fetchRequest to unauthorized user for correct function"
+ (is (not
+ (.permit
+ strict-handler
+ alice-kerb-context
+ "fetchRequest"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY partial-function}))))
+ )
+
+ (deftest test-client-authorization-strict
+ (testing "deny execute to unauthorized user"
+ (is (not
+ (.permit strict-handler
+ (ReqContext/context)
+ "execute"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY function}))))
+
+ (testing "deny execute to valid user for incorrect function"
+ (is (not
+ (.permit
+ strict-handler
+ alice-context
+ "execute"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY "wrongFunction"}))))
+
+ (testing "allow execute to authorized kerb user for correct function"
+ (is
+ (.permit
+ strict-handler
+ alice-kerb-context
+ "execute"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY function})))
+
+ (testing "allow execute to authorized user for correct function"
+ (is
+ (.permit
+ strict-handler
+ alice-context
+ "execute"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY function}))))
+
+
+ (deftest test-client-authorization-permissive
+ (testing "deny execute to unauthorized user for correct function"
+ (is (not
+ (.permit permissive-handler
+ (ReqContext/context)
+ "execute"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY function}))))
+
+ (testing "allow execute for user for incorrect function when permissive"
+ (is
+ (.permit permissive-handler
+ alice-context
+ "execute"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY "wrongFunction"})))
+
+ (testing "allow execute for user for incorrect function when permissive"
+ (is
+ (.permit permissive-handler
+ alice-kerb-context
+ "execute"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY "wrongFunction"})))
+
+ (testing "allow execute to authorized user for correct function"
+ (is
+ (.permit permissive-handler
+ bob-context
+ "execute"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY function}))))
+
+ (deftest test-invocation-authorization-strict
+ (doseq [operation ["fetchRequest" "failRequest" "result"]]
+
+ (testing (str "deny " operation
+ " to unauthorized user for correct function")
+ (is (not
+ (.permit
+ strict-handler
+ alice-context
+ operation
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY function})))
+
+ (testing (str "deny " operation
+ " to user for incorrect function when strict")
+ (is (not
+ (.permit
+ strict-handler
+ charlie-context
+ operation
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY "wrongFunction"}))))
+
+ (testing (str "allow " operation
+ " to authorized user for correct function")
+ (is
+ (.permit
+ strict-handler
+ charlie-context
+ operation
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY function}))))))
+
+ (deftest test-invocation-authorization-permissive
+ (doseq [operation ["fetchRequest" "failRequest" "result"]]
+
+ (testing (str "deny " operation
+ " to unauthorized user for correct function")
+ (is (not
+ (.permit
+ permissive-handler
+ bob-context
+ operation
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY function}))))
+
+ (testing (str "allow " operation
+ " to user for incorrect function when permissive")
+ (is
+ (.permit
+ permissive-handler
+ charlie-context
+ operation
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY "wrongFunction"})))
+
+ (testing (str operation " is allowed for authorized user")
+ (is
+ (.permit
+ permissive-handler
+ charlie-context
+ operation
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY function})))))
+
+ (deftest test-deny-when-no-function-given
+ (is (not
+ (.permit strict-handler alice-context "execute" {})))
+
+ (is (not
+ (.permit
+ strict-handler
+ alice-context
+ "execute"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY nil})))
+
+ (is (not
+ (.permit permissive-handler bob-context "execute" {})))
+
+ (is (not
+ (.permit
+ permissive-handler
+ bob-context
+ "execute"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY nil}))))
+
+ (deftest test-deny-when-invalid-user-given
+ (is (not
+ (.permit
+ strict-handler
+ (Mockito/mock ReqContext)
+ "execute"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY function})))
+
+ (is (not
+ (.permit
+ strict-handler
+ nil
+ "execute"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY function})))
+
+ (is (not
+ (.permit
+ permissive-handler
+ (Mockito/mock ReqContext)
+ "execute"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY function})))
+
+ (is (not
+ (.permit
+ permissive-handler
+ nil
+ "execute"
+ {DRPCSimpleACLAuthorizer/FUNCTION_KEY function})))))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/auto_login_module_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/auto_login_module_test.clj b/storm-core/test/clj/backtype/storm/security/auth/auto_login_module_test.clj
new file mode 100644
index 0000000..2056509
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/auto_login_module_test.clj
@@ -0,0 +1,91 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.security.auth.auto-login-module-test
+ (:use [clojure test])
+ (:use [backtype.storm util])
+ (:import [backtype.storm.security.auth.kerberos AutoTGT
+ AutoTGTKrb5LoginModule AutoTGTKrb5LoginModuleTest])
+ (:import [javax.security.auth Subject Subject])
+ (:import [javax.security.auth.kerberos KerberosTicket])
+ (:import [org.mockito Mockito])
+ )
+
+(deftest login-module-no-subj-no-tgt-test
+ (testing "Behavior is correct when there is no Subject or TGT"
+ (let [login-module (AutoTGTKrb5LoginModule.)]
+
+ (is (thrown-cause? javax.security.auth.login.LoginException
+ (.login login-module)))
+ (is (not (.commit login-module)))
+ (is (not (.abort login-module)))
+ (is (.logout login-module)))))
+
+(deftest login-module-readonly-subj-no-tgt-test
+ (testing "Behavior is correct when there is a read-only Subject and no TGT"
+ (let [readonly-subj (Subject. true #{} #{} #{})
+ login-module (AutoTGTKrb5LoginModule.)]
+ (.initialize login-module readonly-subj nil nil nil)
+ (is (not (.commit login-module)))
+ (is (.logout login-module)))))
+
+(deftest login-module-with-subj-no-tgt-test
+ (testing "Behavior is correct when there is a Subject and no TGT"
+ (let [login-module (AutoTGTKrb5LoginModule.)]
+ (.initialize login-module (Subject.) nil nil nil)
+ (is (thrown-cause? javax.security.auth.login.LoginException
+ (.login login-module)))
+ (is (not (.commit login-module)))
+ (is (not (.abort login-module)))
+ (is (.logout login-module)))))
+
+(deftest login-module-no-subj-with-tgt-test
+ (testing "Behavior is correct when there is no Subject and a TGT"
+ (let [login-module (AutoTGTKrb5LoginModuleTest.)]
+ (.setKerbTicket login-module (Mockito/mock KerberosTicket))
+ (is (.login login-module))
+ (is (thrown-cause? javax.security.auth.login.LoginException
+ (.commit login-module)))
+
+ (.setKerbTicket login-module (Mockito/mock KerberosTicket))
+ (is (.abort login-module))
+ (is (.logout login-module)))))
+
+(deftest login-module-readonly-subj-with-tgt-test
+ (testing "Behavior is correct when there is a read-only Subject and a TGT"
+ (let [readonly-subj (Subject. true #{} #{} #{})
+ login-module (AutoTGTKrb5LoginModuleTest.)]
+ (.initialize login-module readonly-subj nil nil nil)
+ (.setKerbTicket login-module (Mockito/mock KerberosTicket))
+ (is (.login login-module))
+ (is (thrown-cause? javax.security.auth.login.LoginException
+ (.commit login-module)))
+
+ (.setKerbTicket login-module (Mockito/mock KerberosTicket))
+ (is (.abort login-module))
+ (is (.logout login-module)))))
+
+(deftest login-module-with-subj-and-tgt
+ (testing "Behavior is correct when there is a Subject and a TGT"
+ (let [login-module (AutoTGTKrb5LoginModuleTest.)
+ _ (set! (. login-module client) (Mockito/mock
+ java.security.Principal))
+ ticket (Mockito/mock KerberosTicket)]
+ (.initialize login-module (Subject.) nil nil nil)
+ (.setKerbTicket login-module ticket)
+ (is (.login login-module))
+ (is (.commit login-module))
+ (is (.abort login-module))
+ (is (.logout login-module)))))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-alice.jaas
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-alice.jaas b/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-alice.jaas
new file mode 100644
index 0000000..cd691ae
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-alice.jaas
@@ -0,0 +1,5 @@
+StormClient {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="alice"
+ password="poorpasswordforalice";
+};
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-bob.jaas
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-bob.jaas b/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-bob.jaas
new file mode 100644
index 0000000..e4ca097
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-bob.jaas
@@ -0,0 +1,5 @@
+StormClient {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="bob"
+ password="poorpasswordforbob";
+};
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-charlie.jaas
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-charlie.jaas b/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-charlie.jaas
new file mode 100644
index 0000000..3473d6d
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-charlie.jaas
@@ -0,0 +1,5 @@
+StormClient {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="charlie"
+ password="poorpasswordforcharlie";
+};
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-server.jaas
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-server.jaas b/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-server.jaas
new file mode 100644
index 0000000..3b22d21
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-server.jaas
@@ -0,0 +1,6 @@
+StormServer {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ user_alice="poorpasswordforalice"
+ user_bob="poorpasswordforbob"
+ user_charlie="poorpasswordforcharlie";
+};
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/drpc_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/drpc_auth_test.clj b/storm-core/test/clj/backtype/storm/security/auth/drpc_auth_test.clj
new file mode 100644
index 0000000..ff431ec
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/drpc_auth_test.clj
@@ -0,0 +1,315 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.security.auth.drpc-auth-test
+ (:use [clojure test])
+ (:require [backtype.storm.daemon [drpc :as drpc]])
+ (:import [backtype.storm.generated AuthorizationException
+ DRPCExecutionException DistributedRPC$Processor
+ DistributedRPCInvocations$Processor])
+ (:import [backtype.storm Config])
+ (:import [backtype.storm.security.auth ReqContext SingleUserPrincipal ThriftServer ThriftConnectionType])
+ (:import [backtype.storm.utils DRPCClient])
+ (:import [backtype.storm.drpc DRPCInvocationsClient])
+ (:import [java.util.concurrent TimeUnit])
+ (:import [javax.security.auth Subject])
+ (:use [backtype.storm bootstrap util])
+ (:use [backtype.storm.daemon common])
+ (:use [backtype.storm bootstrap testing])
+ )
+
+(bootstrap)
+
+(def drpc-timeout (Integer. 30))
+
+(defn launch-server [conf drpcAznClass transportPluginClass login-cfg client-port invocations-port]
+ (let [conf (if drpcAznClass (assoc conf DRPC-AUTHORIZER drpcAznClass) conf)
+ conf (if transportPluginClass (assoc conf STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass) conf)
+ conf (if login-cfg (assoc conf "java.security.auth.login.config" login-cfg) conf)
+ conf (assoc conf DRPC-PORT client-port)
+ conf (assoc conf DRPC-INVOCATIONS-PORT invocations-port)
+ service-handler (drpc/service-handler conf)
+ handler-server (ThriftServer. conf
+ (DistributedRPC$Processor. service-handler)
+ ThriftConnectionType/DRPC)
+ invoke-server (ThriftServer. conf
+ (DistributedRPCInvocations$Processor. service-handler)
+ ThriftConnectionType/DRPC_INVOCATIONS)]
+ (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop handler-server) (.stop invoke-server))))
+ (log-message "storm conf:" conf)
+ (log-message "Starting DRPC invocation server ...")
+ (.start (Thread. #(.serve invoke-server)))
+ (wait-for-condition #(.isServing invoke-server))
+ (log-message "Starting DRPC handler server ...")
+ (.start (Thread. #(.serve handler-server)))
+ (wait-for-condition #(.isServing handler-server))
+ [handler-server invoke-server]))
+
+(defmacro with-server [args & body]
+ `(let [[handler-server# invoke-server#] (launch-server ~@args)]
+ ~@body
+ (log-message "Stopping DRPC servers ...")
+ (.stop handler-server#)
+ (.stop invoke-server#)
+ ))
+
+(deftest deny-drpc-test
+ (let [client-port (available-port)
+ invocations-port (available-port (inc client-port))
+ storm-conf (read-storm-config)]
+ (with-server [storm-conf "backtype.storm.security.auth.authorizer.DenyAuthorizer"
+ nil nil client-port invocations-port]
+ (let [drpc (DRPCClient. storm-conf "localhost" client-port)
+ drpc_client (.getClient drpc)
+ invocations (DRPCInvocationsClient. storm-conf "localhost" invocations-port)
+ invocations_client (.getClient invocations)]
+ (is (thrown? AuthorizationException (.execute drpc_client "func-foo" "args-bar")))
+ (is (thrown? AuthorizationException (.fetchRequest invocations_client nil)))
+ (.close drpc)
+ (.close invocations)))))
+
+(deftest deny-drpc-digest-test
+ (let [client-port (available-port)
+ invocations-port (available-port (inc client-port))
+ storm-conf (read-storm-config)]
+ (with-server [storm-conf "backtype.storm.security.auth.authorizer.DenyAuthorizer"
+ "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+ client-port invocations-port]
+ (let [conf (merge storm-conf {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"})
+ drpc (DRPCClient. conf "localhost" client-port)
+ drpc_client (.getClient drpc)
+ invocations (DRPCInvocationsClient. conf "localhost" invocations-port)
+ invocations_client (.getClient invocations)]
+ (is (thrown? AuthorizationException (.execute drpc_client "func-foo" "args-bar")))
+ (is (thrown? AuthorizationException (.fetchRequest invocations_client nil)))
+ (.close drpc)
+ (.close invocations)))))
+
+(defmacro with-simple-drpc-test-scenario
+ [[strict? alice-client bob-client charlie-client alice-invok charlie-invok] & body]
+ (let [client-port (available-port)
+ invocations-port (available-port (inc client-port))
+ storm-conf (merge (read-storm-config)
+ {DRPC-AUTHORIZER-ACL-STRICT strict?
+ DRPC-AUTHORIZER-ACL-FILENAME "drpc-simple-acl-test-scenario.yaml"
+ STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"})]
+ `(with-server [~storm-conf
+ "backtype.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer"
+ "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "test/clj/backtype/storm/security/auth/drpc-auth-server.jaas"
+ ~client-port ~invocations-port]
+ (let [~alice-client (DRPCClient.
+ (merge ~storm-conf {"java.security.auth.login.config"
+ "test/clj/backtype/storm/security/auth/drpc-auth-alice.jaas"})
+ "localhost"
+ ~client-port)
+ ~bob-client (DRPCClient.
+ (merge ~storm-conf {"java.security.auth.login.config"
+ "test/clj/backtype/storm/security/auth/drpc-auth-bob.jaas"})
+ "localhost"
+ ~client-port)
+ ~charlie-client (DRPCClient.
+ (merge ~storm-conf {"java.security.auth.login.config"
+ "test/clj/backtype/storm/security/auth/drpc-auth-charlie.jaas"})
+ "localhost"
+ ~client-port)
+ ~alice-invok (DRPCInvocationsClient.
+ (merge ~storm-conf {"java.security.auth.login.config"
+ "test/clj/backtype/storm/security/auth/drpc-auth-alice.jaas"})
+ "localhost"
+ ~invocations-port)
+ ~charlie-invok (DRPCInvocationsClient.
+ (merge ~storm-conf {"java.security.auth.login.config"
+ "test/clj/backtype/storm/security/auth/drpc-auth-charlie.jaas"})
+ "localhost"
+ ~invocations-port)]
+ (try
+ ~@body
+ (finally
+ (.close ~alice-client)
+ (.close ~bob-client)
+ (.close ~charlie-client)
+ (.close ~alice-invok)
+ (.close ~charlie-invok)))))))
+
+(deftest drpc-per-function-auth-strict-test
+ (with-simple-drpc-test-scenario [true alice-client bob-client charlie-client alice-invok charlie-invok]
+ (let [drpc-timeout-seconds 10]
+ (testing "Permitted user can execute a function in the ACL"
+ (let [func "jump"
+ exec-ftr (future (.execute alice-client func "some args"))
+ id (atom "")
+ expected "Authorized DRPC"]
+ (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+ (while (empty? @id)
+ (reset! id
+ (-> charlie-invok (.fetchRequest func) .get_request_id)))
+ (.result charlie-invok @id expected)
+ (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
+
+ (testing "execute fails when function is not in ACL"
+ (is (thrown-cause? AuthorizationException
+ (.execute alice-client "jog" "some args"))))
+
+ (testing "fetchRequest fails when function is not in ACL"
+ (is (thrown-cause? AuthorizationException
+ (.fetchRequest charlie-invok "jog"))))
+
+ (testing "authorized user can fail a request"
+ (let [func "jump"
+ exec-ftr (future (.execute alice-client func "some args"))
+ id (atom "")]
+ (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+ (while (empty? @id)
+ (reset! id
+ (-> charlie-invok (.fetchRequest func) .get_request_id)))
+ (.failRequest charlie-invok @id)
+ (is (thrown-cause? DRPCExecutionException
+ (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
+
+ (testing "unauthorized invocation user is denied returning a result"
+ (let [func "jump"
+ exec-ftr (future (.execute bob-client func "some args"))
+ id (atom "")
+ expected "Only Authorized User can populate the result"]
+ (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+ (while (empty? @id)
+ (reset! id
+ (-> charlie-invok (.fetchRequest func) .get_request_id)))
+ (is (thrown-cause? AuthorizationException
+ (.result alice-invok @id "not the expected result")))
+ (.result charlie-invok @id expected)
+ (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
+
+ (testing "unauthorized invocation user is denied failing a request"
+ (let [func "jump"
+ exec-ftr (future (.execute alice-client func "some args"))
+ id (atom "")]
+ (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+ (while (empty? @id)
+ (reset! id
+ (-> charlie-invok (.fetchRequest func) .get_request_id)))
+ (is (thrown-cause? AuthorizationException
+ (.failRequest alice-invok @id)))
+ (.failRequest charlie-invok @id))))
+
+ (testing "unauthorized invocation user is denied fetching a request"
+ (let [func "jump"
+ exec-ftr (future (.execute bob-client func "some args"))
+ id (atom "")
+ expected "Only authorized users can fetchRequest"]
+ (Thread/sleep 1000)
+ (is (thrown-cause? AuthorizationException
+ (-> alice-invok (.fetchRequest func) .get_request_id)))
+ (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+ (while (empty? @id)
+ (reset! id
+ (-> charlie-invok (.fetchRequest func) .get_request_id)))
+ (.result charlie-invok @id expected)
+ (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS)))))))))
+
+(deftest drpc-per-function-auth-non-strict-test
+ (with-simple-drpc-test-scenario [false alice-client bob-client charlie-client alice-invok charlie-invok]
+ (let [drpc-timeout-seconds 10]
+ (testing "Permitted user can execute a function in the ACL"
+ (let [func "jump"
+ exec-ftr (future (.execute alice-client func "some args"))
+ id (atom "")
+ expected "Authorized DRPC"]
+ (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+ (while (empty? @id)
+ (reset! id
+ (-> charlie-invok (.fetchRequest func) .get_request_id)))
+ (.result charlie-invok @id expected)
+ (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
+
+ (testing "DRPC succeeds for anyone when function is not in ACL"
+ (let [func "jog"
+ exec-ftr (future (.execute charlie-client func "some args"))
+ id (atom "")
+ expected "Permissive/No ACL Entry"]
+ (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+ (while (empty? @id)
+ (reset! id
+ (-> alice-invok (.fetchRequest func) .get_request_id)))
+ (.result alice-invok @id expected)
+ (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
+
+ (testing "failure of a request is allowed when function is not in ACL"
+ (let [func "jog"
+ exec-ftr (future (.execute charlie-client func "some args"))
+ id (atom "")]
+ (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+ (while (empty? @id)
+ (reset! id
+ (-> alice-invok (.fetchRequest func) .get_request_id)))
+ (.failRequest alice-invok @id)
+ (is (thrown-cause? DRPCExecutionException
+ (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
+
+ (testing "authorized user can fail a request"
+ (let [func "jump"
+ exec-ftr (future (.execute alice-client func "some args"))
+ id (atom "")]
+ (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+ (while (empty? @id)
+ (reset! id
+ (-> charlie-invok (.fetchRequest func) .get_request_id)))
+ (.failRequest charlie-invok @id)
+ (is (thrown-cause? DRPCExecutionException
+ (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
+
+ (testing "unauthorized invocation user is denied returning a result"
+ (let [func "jump"
+ exec-ftr (future (.execute bob-client func "some args"))
+ id (atom "")
+ expected "Only Authorized User can populate the result"]
+ (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+ (while (empty? @id)
+ (reset! id
+ (-> charlie-invok (.fetchRequest func) .get_request_id)))
+ (is (thrown-cause? AuthorizationException
+ (.result alice-invok @id "not the expected result")))
+ (.result charlie-invok @id expected)
+ (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
+
+ (testing "unauthorized invocation user is denied failing a request"
+ (let [func "jump"
+ exec-ftr (future (.execute alice-client func "some args"))
+ id (atom "")]
+ (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+ (while (empty? @id)
+ (reset! id
+ (-> charlie-invok (.fetchRequest func) .get_request_id)))
+ (is (thrown-cause? AuthorizationException
+ (.failRequest alice-invok @id)))
+ (.failRequest charlie-invok @id))))
+
+ (testing "unauthorized invocation user is denied fetching a request"
+ (let [func "jump"
+ exec-ftr (future (.execute bob-client func "some args"))
+ id (atom "")
+ expected "Only authorized users can fetchRequest"]
+ (Thread/sleep 1000)
+ (is (thrown-cause? AuthorizationException
+ (-> alice-invok (.fetchRequest func) .get_request_id)))
+ (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+ (while (empty? @id)
+ (reset! id
+ (-> charlie-invok (.fetchRequest func) .get_request_id)))
+ (.result charlie-invok @id expected)
+ (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS)))))))))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
new file mode 100644
index 0000000..bb70239
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
@@ -0,0 +1,181 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.security.auth.nimbus-auth-test
+ (:use [clojure test])
+ (:require [backtype.storm [testing :as testing]])
+ (:require [backtype.storm.daemon [nimbus :as nimbus]])
+ (:require [backtype.storm [zookeeper :as zk]])
+ (:import [java.nio ByteBuffer])
+ (:import [backtype.storm Config])
+ (:import [backtype.storm.utils NimbusClient])
+ (:import [backtype.storm.generated NotAliveException])
+ (:import [backtype.storm.security.auth AuthUtils ThriftServer ThriftClient
+ ReqContext ThriftConnectionType])
+ (:use [backtype.storm bootstrap cluster util])
+ (:use [backtype.storm.daemon common nimbus])
+ (:use [backtype.storm bootstrap])
+ (:import [backtype.storm.generated Nimbus Nimbus$Client
+ AuthorizationException SubmitOptions TopologyInitialStatus KillOptions])
+ (:require [conjure.core])
+ (:use [conjure core])
+ )
+
+(bootstrap)
+
+(def nimbus-timeout (Integer. 30))
+
+(defn launch-test-cluster [nimbus-port login-cfg aznClass transportPluginClass]
+ (let [conf {NIMBUS-AUTHORIZER aznClass
+ NIMBUS-THRIFT-PORT nimbus-port
+ STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass }
+ conf (if login-cfg (merge conf {"java.security.auth.login.config" login-cfg}) conf)
+ cluster-map (testing/mk-local-storm-cluster :supervisors 0
+ :ports-per-supervisor 0
+ :daemon-conf conf)
+ nimbus-server (ThriftServer. (:daemon-conf cluster-map)
+ (Nimbus$Processor. (:nimbus cluster-map))
+ ThriftConnectionType/NIMBUS)]
+ (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop nimbus-server))))
+ (.start (Thread. #(.serve nimbus-server)))
+ (wait-for-condition #(.isServing nimbus-server))
+ [cluster-map nimbus-server]))
+
+(defmacro with-test-cluster [args & body]
+ `(let [[cluster-map# nimbus-server#] (launch-test-cluster ~@args)]
+ ~@body
+ (log-debug "Shutdown cluster from macro")
+ (testing/kill-local-storm-cluster cluster-map#)
+ (.stop nimbus-server#)))
+
+(deftest Simple-authentication-test
+ (with-test-cluster [6627 nil nil "backtype.storm.security.auth.SimpleTransportPlugin"]
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
+ STORM-NIMBUS-RETRY-TIMES 0})
+ client (NimbusClient. storm-conf "localhost" 6627 nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (testing "(Positive authorization) Simple protocol w/o authentication/authorization enforcement"
+ (is (thrown-cause? NotAliveException
+ (.activate nimbus_client "topo-name"))))
+ (.close client))))
+
+(deftest test-noop-authorization-w-simple-transport
+ (with-test-cluster [6628 nil
+ "backtype.storm.security.auth.authorizer.NoopAuthorizer"
+ "backtype.storm.security.auth.SimpleTransportPlugin"]
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
+ STORM-NIMBUS-RETRY-TIMES 0})
+ client (NimbusClient. storm-conf "localhost" 6628 nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (testing "(Positive authorization) Authorization plugin should accept client request"
+ (is (thrown-cause? NotAliveException
+ (.activate nimbus_client "topo-name"))))
+ (.close client))))
+
+(deftest test-deny-authorization-w-simple-transport
+ (with-test-cluster [6629 nil
+ "backtype.storm.security.auth.authorizer.DenyAuthorizer"
+ "backtype.storm.security.auth.SimpleTransportPlugin"]
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
+ Config/NIMBUS_HOST "localhost"
+ Config/NIMBUS_THRIFT_PORT 6629
+ STORM-NIMBUS-RETRY-TIMES 0})
+ client (NimbusClient/getConfiguredClient storm-conf)
+ nimbus_client (.getClient client)
+ topologyInitialStatus (TopologyInitialStatus/findByValue 2)
+ submitOptions (SubmitOptions. topologyInitialStatus)]
+ (is (thrown-cause? AuthorizationException (.submitTopology nimbus_client "topo-name" nil nil nil)))
+ (is (thrown-cause? AuthorizationException (.submitTopologyWithOpts nimbus_client "topo-name" nil nil nil submitOptions)))
+ (is (thrown-cause? AuthorizationException (.beginFileUpload nimbus_client)))
+ (is (thrown-cause? AuthorizationException (.uploadChunk nimbus_client nil nil)))
+ (is (thrown-cause? AuthorizationException (.finishFileUpload nimbus_client nil)))
+ (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus_client nil)))
+ (is (thrown-cause? AuthorizationException (.downloadChunk nimbus_client nil)))
+ (is (thrown-cause? AuthorizationException (.getNimbusConf nimbus_client)))
+ (is (thrown-cause? AuthorizationException (.getClusterInfo nimbus_client)))
+ (stubbing [nimbus/check-storm-active! nil
+ nimbus/try-read-storm-conf-from-name {}]
+ (is (thrown-cause? AuthorizationException (.killTopology nimbus_client "topo-name")))
+ (is (thrown-cause? AuthorizationException (.killTopologyWithOpts nimbus_client "topo-name" (KillOptions.))))
+ (is (thrown-cause? AuthorizationException (.activate nimbus_client "topo-name")))
+ (is (thrown-cause? AuthorizationException (.deactivate nimbus_client "topo-name")))
+ (is (thrown-cause? AuthorizationException (.rebalance nimbus_client "topo-name" nil)))
+ )
+ (stubbing [nimbus/try-read-storm-conf {}]
+ (is (thrown-cause? AuthorizationException (.getTopologyConf nimbus_client "topo-ID")))
+ (is (thrown-cause? AuthorizationException (.getTopology nimbus_client "topo-ID")))
+ (is (thrown-cause? AuthorizationException (.getUserTopology nimbus_client "topo-ID")))
+ (is (thrown-cause? AuthorizationException (.getTopologyInfo nimbus_client "topo-ID"))))
+ (.close client))))
+
+(deftest test-noop-authorization-w-sasl-digest
+ (with-test-cluster [6630
+ "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+ "backtype.storm.security.auth.authorizer.NoopAuthorizer"
+ "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"]
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+ Config/NIMBUS_HOST "localhost"
+ Config/NIMBUS_THRIFT_PORT 6630
+ STORM-NIMBUS-RETRY-TIMES 0})
+ client (NimbusClient/getConfiguredClient storm-conf)
+ nimbus_client (.getClient client)]
+ (testing "(Positive authorization) Authorization plugin should accept client request"
+ (is (thrown-cause? NotAliveException
+ (.activate nimbus_client "topo-name"))))
+ (.close client))))
+
+(deftest test-deny-authorization-w-sasl-digest
+ (with-test-cluster [6631
+ "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+ "backtype.storm.security.auth.authorizer.DenyAuthorizer"
+ "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"]
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+ Config/NIMBUS_HOST "localhost"
+ Config/NIMBUS_THRIFT_PORT 6631
+ STORM-NIMBUS-RETRY-TIMES 0})
+ client (NimbusClient/getConfiguredClient storm-conf)
+ nimbus_client (.getClient client)
+ topologyInitialStatus (TopologyInitialStatus/findByValue 2)
+ submitOptions (SubmitOptions. topologyInitialStatus)]
+ (is (thrown-cause? AuthorizationException (.submitTopology nimbus_client "topo-name" nil nil nil)))
+ (is (thrown-cause? AuthorizationException (.submitTopologyWithOpts nimbus_client "topo-name" nil nil nil submitOptions)))
+ (is (thrown-cause? AuthorizationException (.beginFileUpload nimbus_client)))
+ (is (thrown-cause? AuthorizationException (.uploadChunk nimbus_client nil nil)))
+ (is (thrown-cause? AuthorizationException (.finishFileUpload nimbus_client nil)))
+ (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus_client nil)))
+ (is (thrown-cause? AuthorizationException (.downloadChunk nimbus_client nil)))
+ (is (thrown-cause? AuthorizationException (.getNimbusConf nimbus_client)))
+ (is (thrown-cause? AuthorizationException (.getClusterInfo nimbus_client)))
+ (stubbing [nimbus/check-storm-active! nil
+ nimbus/try-read-storm-conf-from-name {}]
+ (is (thrown-cause? AuthorizationException (.killTopology nimbus_client "topo-name")))
+ (is (thrown-cause? AuthorizationException (.killTopologyWithOpts nimbus_client "topo-name" (KillOptions.))))
+ (is (thrown-cause? AuthorizationException (.activate nimbus_client "topo-name")))
+ (is (thrown-cause? AuthorizationException (.deactivate nimbus_client "topo-name")))
+ (is (thrown-cause? AuthorizationException (.rebalance nimbus_client "topo-name" nil))))
+ (stubbing [nimbus/try-read-storm-conf {}]
+ (is (thrown-cause? AuthorizationException (.getTopologyConf nimbus_client "topo-ID")))
+ (is (thrown-cause? AuthorizationException (.getTopology nimbus_client "topo-ID")))
+ (is (thrown-cause? AuthorizationException (.getUserTopology nimbus_client "topo-ID")))
+ (is (thrown-cause? AuthorizationException (.getTopologyInfo nimbus_client "topo-ID"))))
+ (.close client))))
+
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/submitter_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/submitter_test.clj b/storm-core/test/clj/backtype/storm/submitter_test.clj
new file mode 100644
index 0000000..6f88297
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/submitter_test.clj
@@ -0,0 +1,75 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.submitter-test
+ (:use [clojure test])
+ (:use [backtype.storm config testing])
+ (:import [backtype.storm StormSubmitter])
+ )
+
+(deftest test-md5-digest-secret-generation
+ (testing "No payload or scheme are generated when already present"
+ (let [conf {STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD "foobar:12345"
+ STORM-ZOOKEEPER-AUTH-SCHEME "anything"}
+ result (StormSubmitter/prepareZookeeperAuthentication conf)
+ actual-payload (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)
+ actual-scheme (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME)]
+ (is (nil? actual-payload))
+ (is (= "digest" actual-scheme))))
+
+ (testing "Scheme is set to digest if not already."
+ (let [conf {STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD "foobar:12345"}
+ result (StormSubmitter/prepareZookeeperAuthentication conf)
+ actual-payload (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)
+ actual-scheme (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME)]
+ (is (nil? actual-payload))
+ (is (= "digest" actual-scheme))))
+
+ (testing "A payload is generated when no payload is present."
+ (let [conf {STORM-ZOOKEEPER-AUTH-SCHEME "anything"}
+ result (StormSubmitter/prepareZookeeperAuthentication conf)
+ actual-payload (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)
+ actual-scheme (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME)]
+ (is (not (clojure.string/blank? actual-payload)))
+ (is (= "digest" actual-scheme))))
+
+ (testing "A payload is generated when payload is not correctly formatted."
+ (let [bogus-payload "not-a-valid-payload"
+ conf {STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD bogus-payload
+ STORM-ZOOKEEPER-AUTH-SCHEME "anything"}
+ result (StormSubmitter/prepareZookeeperAuthentication conf)
+ actual-payload (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)
+ actual-scheme (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME)]
+ (is (not (StormSubmitter/validateZKDigestPayload bogus-payload))) ; Is this test correct?
+ (is (not (clojure.string/blank? actual-payload)))
+ (is (= "digest" actual-scheme))))
+
+ (testing "A payload is generated when payload is null."
+ (let [conf {STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD nil
+ STORM-ZOOKEEPER-AUTH-SCHEME "anything"}
+ result (StormSubmitter/prepareZookeeperAuthentication conf)
+ actual-payload (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)
+ actual-scheme (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME)]
+ (is (not (clojure.string/blank? actual-payload)))
+ (is (= "digest" actual-scheme))))
+
+ (testing "A payload is generated when payload is blank."
+ (let [conf {STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD ""
+ STORM-ZOOKEEPER-AUTH-SCHEME "anything"}
+ result (StormSubmitter/prepareZookeeperAuthentication conf)
+ actual-payload (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)
+ actual-scheme (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME)]
+ (is (not (clojure.string/blank? actual-payload)))
+ (is (= "digest" actual-scheme)))))