You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/21 18:06:57 UTC

[02/14] STORM-216: Added Authentication and Authorization.

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj b/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
index c7d7411..6fdd485 100644
--- a/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
@@ -19,36 +19,40 @@
   (:import [org.apache.thrift TException])
   (:import [org.apache.thrift.transport TTransportException])
   (:import [java.nio ByteBuffer])
+  (:import [java.security Principal AccessController])
+  (:import [javax.security.auth Subject])
+  (:import [java.net InetAddress])
   (:import [backtype.storm Config])
+  (:import [backtype.storm.generated AuthorizationException])
   (:import [backtype.storm.utils NimbusClient])
+  (:import [backtype.storm.security.auth.authorizer SimpleWhitelistAuthorizer SimpleACLAuthorizer])
   (:import [backtype.storm.security.auth AuthUtils ThriftServer ThriftClient 
-                                         ReqContext])
+            ReqContext SimpleTransportPlugin KerberosPrincipalToLocal ThriftConnectionType])
   (:use [backtype.storm bootstrap util])
   (:use [backtype.storm.daemon common])
   (:use [backtype.storm bootstrap testing])
-  (:import [backtype.storm.generated Nimbus Nimbus$Client])
-  )
+  (:import [backtype.storm.generated Nimbus Nimbus$Client]))
 
 (bootstrap)
 
-(def nimbus-timeout (Integer. 120))
+(defn mk-principal [name]
+  (reify Principal
+    (equals [this other]
+      (= name (.getName other)))
+    (getName [this] name)
+    (toString [this] name)
+    (hashCode [this] (.hashCode name))))
+
+(defn mk-subject [name]
+  (Subject. true #{(mk-principal name)} #{} #{}))
 
-(defn mk-authorization-handler [storm-conf]
-  (let [klassname (storm-conf NIMBUS-AUTHORIZER) 
-        aznClass (if klassname (Class/forName klassname))
-        aznHandler (if aznClass (.newInstance aznClass))] 
-    (if aznHandler (.prepare aznHandler storm-conf))
-    (log-debug "authorization class name:" klassname
-               " class:" aznClass
-               " handler:" aznHandler)
-    aznHandler
-    ))
+(def nimbus-timeout (Integer. 120))
 
 (defn nimbus-data [storm-conf inimbus]
   (let [forced-scheduler (.getForcedScheduler inimbus)]
     {:conf storm-conf
      :inimbus inimbus
-     :authorization-handler (mk-authorization-handler storm-conf)
+     :authorization-handler (mk-authorization-handler (storm-conf NIMBUS-AUTHORIZER) storm-conf)
      :submitted-count (atom 0)
      :storm-cluster-state nil
      :submit-lock (Object.)
@@ -61,71 +65,75 @@
      :scheduler nil
      }))
 
-(defn check-authorization! [nimbus storm-name storm-conf operation]
-  (let [aclHandler (:authorization-handler nimbus)]
-    (log-debug "check-authorization with handler: " aclHandler)
-    (if aclHandler
-        (if-not (.permit aclHandler 
-                  (ReqContext/context) 
-                  operation 
-                  (if storm-conf storm-conf {TOPOLOGY-NAME storm-name}))
-          (throw (RuntimeException. (str operation " on topology " storm-name " is not authorized")))
-          ))))
-
-(defn dummy-service-handler [conf inimbus]
-  (let [nimbus (nimbus-data conf inimbus)]
-    (reify Nimbus$Iface
-      (^void submitTopologyWithOpts [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
-                                     ^SubmitOptions submitOptions]
-        (check-authorization! nimbus storm-name nil "submitTopology"))
-      
-      (^void killTopology [this ^String storm-name]
-        (check-authorization! nimbus storm-name nil "killTopology"))
-      
-      (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
-        (check-authorization! nimbus storm-name nil "killTopology"))
-
-      (^void rebalance [this ^String storm-name ^RebalanceOptions options]
-        (check-authorization! nimbus storm-name nil "rebalance"))
-
-      (activate [this storm-name]
-        (check-authorization! nimbus storm-name nil "activate"))
-
-      (deactivate [this storm-name]
-        (check-authorization! nimbus storm-name nil "deactivate"))
-
-      (beginFileUpload [this])
-
-      (^void uploadChunk [this ^String location ^ByteBuffer chunk])
-
-      (^void finishFileUpload [this ^String location])
-
-      (^String beginFileDownload [this ^String file])
-
-      (^ByteBuffer downloadChunk [this ^String id])
-
-      (^String getNimbusConf [this])
+(defn dummy-service-handler 
+  ([conf inimbus auth-context]
+     (let [nimbus-d (nimbus-data conf inimbus)
+           topo-conf (atom nil)]
+       (reify Nimbus$Iface
+         (^void submitTopologyWithOpts [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
+                                        ^SubmitOptions submitOptions]
+           (if (not (nil? serializedConf)) (swap! topo-conf (fn [prev new] new) (from-json serializedConf)))
+           (nimbus/check-authorization! nimbus-d storm-name @topo-conf "submitTopology" auth-context))
+         
+         (^void killTopology [this ^String storm-name]
+           (nimbus/check-authorization! nimbus-d storm-name @topo-conf "killTopology" auth-context))
+         
+         (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
+           (nimbus/check-authorization! nimbus-d storm-name @topo-conf "killTopology" auth-context))
+         
+         (^void rebalance [this ^String storm-name ^RebalanceOptions options]
+           (nimbus/check-authorization! nimbus-d storm-name @topo-conf "rebalance" auth-context))
+         
+         (activate [this storm-name]
+           (nimbus/check-authorization! nimbus-d storm-name @topo-conf "activate" auth-context))
+         
+         (deactivate [this storm-name]
+           (nimbus/check-authorization! nimbus-d storm-name @topo-conf "deactivate" auth-context))
 
-      (^String getTopologyConf [this ^String id])
+         (uploadNewCredentials [this storm-name creds]
+           (nimbus/check-authorization! nimbus-d storm-name @topo-conf "uploadNewCredentials" auth-context)) 
+         
+         (beginFileUpload [this])
+         
+         (^void uploadChunk [this ^String location ^ByteBuffer chunk])
+         
+         (^void finishFileUpload [this ^String location])
+         
+         (^String beginFileDownload [this ^String file]
+           (nimbus/check-authorization! nimbus-d nil nil "fileDownload" auth-context)
+           "Done!")
+         
+         (^ByteBuffer downloadChunk [this ^String id])
+         
+         (^String getNimbusConf [this])
+         
+         (^String getTopologyConf [this ^String id])
+         
+         (^StormTopology getTopology [this ^String id])
+         
+         (^StormTopology getUserTopology [this ^String id])
+         
+         (^ClusterSummary getClusterInfo [this])
+         
+         (^TopologyInfo getTopologyInfo [this ^String storm-id]))))
+  ([conf inimbus]
+     (dummy-service-handler conf inimbus nil)))
+     
 
-      (^StormTopology getTopology [this ^String id])
-
-      (^StormTopology getUserTopology [this ^String id])
-
-      (^ClusterSummary getClusterInfo [this])
-      
-      (^TopologyInfo getTopologyInfo [this ^String storm-id]))))
-
-(defn launch-server [server-port login-cfg aznClass transportPluginClass] 
+(defn launch-server [server-port login-cfg aznClass transportPluginClass serverConf] 
   (let [conf1 (merge (read-storm-config)
-                    {NIMBUS-AUTHORIZER aznClass 
-                     NIMBUS-HOST "localhost"
-                     NIMBUS-THRIFT-PORT server-port
-                     STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass})
-        conf (if login-cfg (merge conf1 {"java.security.auth.login.config" login-cfg}) conf1)
+                     {NIMBUS-AUTHORIZER aznClass 
+                      NIMBUS-HOST "localhost"
+                      NIMBUS-THRIFT-PORT server-port
+                      STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass})
+        conf2 (if login-cfg (merge conf1 {"java.security.auth.login.config" login-cfg}) conf1)
+        conf (if serverConf (merge conf2 serverConf) conf2)
         nimbus (nimbus/standalone-nimbus)
         service-handler (dummy-service-handler conf nimbus)
-        server (ThriftServer. conf (Nimbus$Processor. service-handler) (int (conf NIMBUS-THRIFT-PORT)))]
+        server (ThriftServer. 
+                conf 
+                (Nimbus$Processor. service-handler) 
+                ThriftConnectionType/NIMBUS)]
     (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server))))
     (.start (Thread. #(.serve server)))
     (wait-for-condition #(.isServing server))
@@ -133,32 +141,172 @@
 
 (defmacro with-server [args & body]
   `(let [server# (launch-server ~@args)]
-      ~@body
-      (.stop server#)
-      ))
+     ~@body
+     (.stop server#)
+     ))
 
-(deftest Simple-authentication-test 
+(deftest kerb-to-local-test
+  (let [kptol (KerberosPrincipalToLocal. )]
+    (.prepare kptol {})
+    (is (= "me" (.toLocal kptol (mk-principal "me@realm"))))
+    (is (= "simple" (.toLocal kptol (mk-principal "simple"))))
+    (is (= "someone" (.toLocal kptol (mk-principal "someone/host@realm"))))))
+
+(deftest Simple-authentication-test
   (let [a-port (available-port)]
-    (with-server [a-port nil nil "backtype.storm.security.auth.SimpleTransportPlugin"]
+    (with-server [a-port nil nil "backtype.storm.security.auth.SimpleTransportPlugin" nil]
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
             client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
             nimbus_client (.getClient client)]
         (.activate nimbus_client "security_auth_test_topology")
         (.close client))
-
+      
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
-                               "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"})]
+                               "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+                               STORM-NIMBUS-RETRY-TIMES 0})]
         (testing "(Negative authentication) Server: Simple vs. Client: Digest"
           (is (thrown-cause?  java.net.SocketTimeoutException
                               (NimbusClient. storm-conf "localhost" a-port nimbus-timeout))))))))
   
+(deftest negative-whitelist-authorization-test 
+  (let [a-port (available-port)]
+    (with-server [a-port nil
+                  "backtype.storm.security.auth.authorizer.SimpleWhitelistAuthorizer" 
+                  "backtype.storm.testing.SingleUserSimpleTransport" nil]
+      (let [storm-conf (merge (read-storm-config)
+                              {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.testing.SingleUserSimpleTransport"})
+            client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
+            nimbus_client (.getClient client)]
+        (testing "(Negative authorization) Authorization plugin should reject client request"
+          (is (thrown-cause? AuthorizationException
+                             (.activate nimbus_client "security_auth_test_topology"))))
+        (.close client)))))
+
+(deftest positive-whitelist-authorization-test 
+    (let [a-port (available-port)]
+      (with-server [a-port nil
+                    "backtype.storm.security.auth.authorizer.SimpleWhitelistAuthorizer" 
+                    "backtype.storm.testing.SingleUserSimpleTransport" {SimpleWhitelistAuthorizer/WHITELIST_USERS_CONF ["user"]}]
+        (let [storm-conf (merge (read-storm-config)
+                                {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.testing.SingleUserSimpleTransport"})
+              client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
+              nimbus_client (.getClient client)]
+          (testing "(Positive authorization) Authorization plugin should accept client request"
+            (.activate nimbus_client "security_auth_test_topology"))
+          (.close client)))))
+
+(deftest simple-acl-user-auth-test
+  (let [cluster-conf (merge (read-storm-config)
+                       {NIMBUS-ADMINS ["admin"]
+                        NIMBUS-SUPERVISOR-USERS ["supervisor"]})
+        authorizer (SimpleACLAuthorizer. )
+        admin-user (mk-subject "admin")
+        supervisor-user (mk-subject "supervisor")
+        user-a (mk-subject "user-a")
+        user-b (mk-subject "user-b")]
+  (.prepare authorizer cluster-conf) 
+  (is (= true (.permit authorizer (ReqContext. user-a) "submitTopology" {})))
+  (is (= true (.permit authorizer (ReqContext. user-b) "submitTopology" {})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "submitTopology" {})))
+  (is (= false (.permit authorizer (ReqContext. supervisor-user) "submitTopology" {})))
+
+  (is (= true (.permit authorizer (ReqContext. user-a) "fileUpload" nil)))
+  (is (= true (.permit authorizer (ReqContext. user-b) "fileUpload" nil)))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil)))
+  (is (= false (.permit authorizer (ReqContext. supervisor-user) "fileUpload" nil)))
+
+  (is (= true (.permit authorizer (ReqContext. user-a) "getNimbusConf" nil)))
+  (is (= true (.permit authorizer (ReqContext. user-b) "getNimbusConf" nil)))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "getNimbusConf" nil)))
+  (is (= false (.permit authorizer (ReqContext. supervisor-user) "getNimbusConf" nil)))
+
+  (is (= true (.permit authorizer (ReqContext. user-a) "getClusterInfo" nil)))
+  (is (= true (.permit authorizer (ReqContext. user-b) "getClusterInfo" nil)))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "getClusterInfo" nil)))
+  (is (= false (.permit authorizer (ReqContext. supervisor-user) "getClusterInfo" nil)))
+
+  (is (= false (.permit authorizer (ReqContext. user-a) "fileDownload" nil)))
+  (is (= false (.permit authorizer (ReqContext. user-b) "fileDownload" nil)))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "fileDownload" nil)))
+  (is (= true (.permit authorizer (ReqContext. supervisor-user) "fileDownload" nil)))
+
+  (is (= true (.permit authorizer (ReqContext. user-a) "killTopology" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. user-b) "killTopology" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "killTopology" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. supervisor-user) "killTopolgy" {TOPOLOGY-USERS ["user-a"]})))
+
+  (is (= true (.permit authorizer (ReqContext. user-a) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. user-b) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. supervisor-user) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
+
+  (is (= true (.permit authorizer (ReqContext. user-a) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. user-b) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. supervisor-user) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
+
+  (is (= true (.permit authorizer (ReqContext. user-a) "activate" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. user-b) "activate" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "activate" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. supervisor-user) "activate" {TOPOLOGY-USERS ["user-a"]})))
+
+  (is (= true (.permit authorizer (ReqContext. user-a) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. user-b) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. supervisor-user) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
+
+  (is (= true (.permit authorizer (ReqContext. user-a) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. user-b) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
+
+  (is (= true (.permit authorizer (ReqContext. user-a) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. user-b) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
+
+  (is (= true (.permit authorizer (ReqContext. user-a) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. user-b) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. supervisor-user) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
+
+  (is (= true (.permit authorizer (ReqContext. user-a) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. user-b) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
+))
+
+(deftest simple-acl-same-user-auth-test
+  (let [cluster-conf (merge (read-storm-config)
+                       {NIMBUS-ADMINS ["admin"]
+                        NIMBUS-SUPERVISOR-USERS ["admin"]})
+        authorizer (SimpleACLAuthorizer. )
+        admin-user (mk-subject "admin")]
+  (.prepare authorizer cluster-conf) 
+  (is (= true (.permit authorizer (ReqContext. admin-user) "submitTopology" {})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil)))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "getNimbusConf" nil)))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "getClusterInfo" nil)))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "fileDownload" nil)))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "killTopology" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "activate" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
+  (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
+))
+
+
 (deftest positive-authorization-test 
   (let [a-port (available-port)]
-    (with-server [a-port nil 
+    (with-server [a-port nil
                   "backtype.storm.security.auth.authorizer.NoopAuthorizer" 
-                  "backtype.storm.security.auth.SimpleTransportPlugin"]
+                  "backtype.storm.security.auth.SimpleTransportPlugin" nil]
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
             client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
@@ -171,7 +319,7 @@
   (let [a-port (available-port)]
     (with-server [a-port nil
                   "backtype.storm.security.auth.authorizer.DenyAuthorizer" 
-                  "backtype.storm.security.auth.SimpleTransportPlugin"]
+                  "backtype.storm.security.auth.SimpleTransportPlugin" nil]
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
                                Config/NIMBUS_HOST "localhost"
@@ -180,19 +328,20 @@
             client (NimbusClient/getConfiguredClient storm-conf)
             nimbus_client (.getClient client)]
         (testing "(Negative authorization) Authorization plugin should reject client request"
-          (is (thrown? TTransportException
-                       (.activate nimbus_client "security_auth_test_topology"))))
+          (is (thrown-cause? AuthorizationException
+                             (.activate nimbus_client "security_auth_test_topology"))))
         (.close client)))))
 
 (deftest digest-authentication-test
   (let [a-port (available-port)]
-    (with-server [a-port  
+    (with-server [a-port
                   "test/clj/backtype/storm/security/auth/jaas_digest.conf" 
                   nil
-                  "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"]
+                  "backtype.storm.security.auth.digest.DigestSaslTransportPlugin" nil]
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
-                               "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"})
+                               "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+                               STORM-NIMBUS-RETRY-TIMES 0})
             client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
             nimbus_client (.getClient client)]
         (testing "(Positive authentication) valid digest authentication"
@@ -200,7 +349,8 @@
         (.close client))
       
       (let [storm-conf (merge (read-storm-config)
-                              {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
+                              {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
+                               STORM-NIMBUS-RETRY-TIMES 0})
             client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
             nimbus_client (.getClient client)]
         (testing "(Negative authentication) Server: Digest vs. Client: Simple"
@@ -210,34 +360,37 @@
       
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
-                             "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_bad_password.conf"})]
+                               "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_bad_password.conf"
+                               STORM-NIMBUS-RETRY-TIMES 0})]
         (testing "(Negative authentication) Invalid  password"
-          (is (thrown? TTransportException
-                       (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
+          (is (thrown-cause? TTransportException
+                             (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
       
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
-                               "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_unknown_user.conf"})]
+                               "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_unknown_user.conf"
+                               STORM-NIMBUS-RETRY-TIMES 0})]
         (testing "(Negative authentication) Unknown user"
-          (is (thrown? TTransportException 
-                       (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
-      
+          (is (thrown-cause? TTransportException 
+                             (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
+
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
-                               "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/nonexistent.conf"})]
+                               "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/nonexistent.conf"
+                               STORM-NIMBUS-RETRY-TIMES 0})]
         (testing "(Negative authentication) nonexistent configuration file"
-          (is (thrown? RuntimeException 
-                       (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
+          (is (thrown-cause? RuntimeException 
+                             (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
       
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
-                               "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_missing_client.conf"})]
+                               "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_missing_client.conf"
+                               STORM-NIMBUS-RETRY-TIMES 0})]
         (testing "(Negative authentication) Missing client"
           (is (thrown-cause? java.io.IOException
                              (NimbusClient. storm-conf "localhost" a-port nimbus-timeout))))))))
-  
-  
+        
 (deftest test-GetTransportPlugin-throws-RuntimeException
   (let [conf (merge (read-storm-config)
                     {Config/STORM_THRIFT_TRANSPORT_PLUGIN "null.invalid"})]
-    (is (thrown? RuntimeException (AuthUtils/GetTransportPlugin conf nil)))))
+    (is (thrown-cause? RuntimeException (AuthUtils/GetTransportPlugin conf nil nil)))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/authorizer/DRPCSimpleAclAuthorizer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/authorizer/DRPCSimpleAclAuthorizer_test.clj b/storm-core/test/clj/backtype/storm/security/auth/authorizer/DRPCSimpleAclAuthorizer_test.clj
new file mode 100644
index 0000000..d592a40
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/authorizer/DRPCSimpleAclAuthorizer_test.clj
@@ -0,0 +1,226 @@
+(ns backtype.storm.security.auth.authorizer.DRPCSimpleAclAuthorizer-test
+  (:use [clojure test])
+  (:import [org.mockito Mockito])
+  (:import [backtype.storm Config])
+  (:import [backtype.storm.security.auth ReqContext SingleUserPrincipal])
+  (:import [backtype.storm.security.auth.authorizer DRPCSimpleACLAuthorizer])
+  (:use [backtype.storm config util])
+  )
+
+(defn- mk-mock-context [user]
+  (let [mock-context (Mockito/mock ReqContext)]
+    (. (Mockito/when (.principal mock-context)) thenReturn
+       (SingleUserPrincipal. user))
+    mock-context))
+
+(let [function "jump"
+      partial-function "partial"
+      alice-context (mk-mock-context "alice")
+      alice-kerb-context (mk-mock-context "alice@SOME.RELM")
+      bob-context (mk-mock-context "bob")
+      charlie-context (mk-mock-context "charlie")
+      acl-file "drpc-simple-acl-test-scenario.yaml"
+      strict-handler (doto (DRPCSimpleACLAuthorizer.)
+                           (.prepare {DRPC-AUTHORIZER-ACL-STRICT true
+                                      DRPC-AUTHORIZER-ACL-FILENAME acl-file
+                                      STORM-PRINCIPAL-TO-LOCAL-PLUGIN "backtype.storm.security.auth.KerberosPrincipalToLocal"}))
+      permissive-handler (doto (DRPCSimpleACLAuthorizer.)
+                               (.prepare {DRPC-AUTHORIZER-ACL-STRICT false
+                                          DRPC-AUTHORIZER-ACL-FILENAME acl-file
+                                          STORM-PRINCIPAL-TO-LOCAL-PLUGIN "backtype.storm.security.auth.KerberosPrincipalToLocal"}))]
+
+  (deftest test-partial-authorization
+    (testing "deny execute to unauthorized user"
+      (is (not
+            (.permit strict-handler
+                     (ReqContext/context)
+                     "execute"
+                     {DRPCSimpleACLAuthorizer/FUNCTION_KEY partial-function}))))
+
+    (testing "allow execute to authorized kerb user for correct function"
+      (is
+        (.permit
+          strict-handler
+          alice-kerb-context
+          "execute"
+          {DRPCSimpleACLAuthorizer/FUNCTION_KEY partial-function})))
+
+      (testing "deny fetchRequest to unauthorized user for correct function"
+        (is (not
+              (.permit
+                strict-handler
+                alice-kerb-context
+                "fetchRequest"
+                {DRPCSimpleACLAuthorizer/FUNCTION_KEY partial-function}))))
+    )
+
+  (deftest test-client-authorization-strict
+    (testing "deny execute to unauthorized user"
+      (is (not
+            (.permit strict-handler
+                     (ReqContext/context)
+                     "execute"
+                     {DRPCSimpleACLAuthorizer/FUNCTION_KEY function}))))
+
+    (testing "deny execute to valid user for incorrect function"
+      (is (not
+            (.permit
+              strict-handler
+              alice-context
+              "execute"
+              {DRPCSimpleACLAuthorizer/FUNCTION_KEY "wrongFunction"}))))
+
+    (testing "allow execute to authorized kerb user for correct function"
+      (is
+        (.permit
+          strict-handler
+          alice-kerb-context
+          "execute"
+          {DRPCSimpleACLAuthorizer/FUNCTION_KEY function})))
+
+    (testing "allow execute to authorized user for correct function"
+      (is
+        (.permit
+          strict-handler
+          alice-context
+          "execute"
+          {DRPCSimpleACLAuthorizer/FUNCTION_KEY function}))))
+
+
+  (deftest test-client-authorization-permissive
+    (testing "deny execute to unauthorized user for correct function"
+      (is (not
+            (.permit permissive-handler
+                     (ReqContext/context)
+                     "execute"
+                     {DRPCSimpleACLAuthorizer/FUNCTION_KEY function}))))
+
+    (testing "allow execute for user for incorrect function when permissive"
+      (is
+        (.permit permissive-handler
+                 alice-context
+                 "execute"
+                 {DRPCSimpleACLAuthorizer/FUNCTION_KEY "wrongFunction"})))
+
+    (testing "allow execute for user for incorrect function when permissive"
+      (is
+        (.permit permissive-handler
+                 alice-kerb-context
+                 "execute"
+                 {DRPCSimpleACLAuthorizer/FUNCTION_KEY "wrongFunction"})))
+
+    (testing "allow execute to authorized user for correct function"
+      (is
+        (.permit permissive-handler
+                 bob-context
+                 "execute"
+                 {DRPCSimpleACLAuthorizer/FUNCTION_KEY function}))))
+
+  (deftest test-invocation-authorization-strict
+    (doseq [operation ["fetchRequest" "failRequest" "result"]]
+
+      (testing (str "deny " operation
+                    " to unauthorized user for correct function")
+        (is (not
+              (.permit
+                strict-handler
+                alice-context
+                operation
+                {DRPCSimpleACLAuthorizer/FUNCTION_KEY function})))
+
+      (testing (str "deny " operation
+                    " to user for incorrect function when strict")
+        (is (not
+              (.permit
+                strict-handler
+                charlie-context
+                operation
+                {DRPCSimpleACLAuthorizer/FUNCTION_KEY "wrongFunction"}))))
+
+      (testing (str "allow " operation
+                    " to authorized user for correct function")
+        (is
+          (.permit
+            strict-handler
+            charlie-context
+            operation
+            {DRPCSimpleACLAuthorizer/FUNCTION_KEY function}))))))
+
+  (deftest test-invocation-authorization-permissive
+    (doseq [operation ["fetchRequest" "failRequest" "result"]]
+
+      (testing (str "deny " operation
+                    " to unauthorized user for correct function")
+        (is (not
+              (.permit
+                permissive-handler
+                bob-context
+                operation
+                {DRPCSimpleACLAuthorizer/FUNCTION_KEY function}))))
+
+        (testing (str "allow " operation
+                      " to user for incorrect function when permissive")
+          (is
+            (.permit
+              permissive-handler
+              charlie-context
+              operation
+              {DRPCSimpleACLAuthorizer/FUNCTION_KEY "wrongFunction"})))
+
+      (testing (str operation " is allowed for authorized user")
+        (is
+          (.permit
+            permissive-handler
+            charlie-context
+            operation
+            {DRPCSimpleACLAuthorizer/FUNCTION_KEY function})))))
+
+  (deftest test-deny-when-no-function-given
+    (is (not
+         (.permit strict-handler alice-context "execute" {})))
+
+    (is (not
+         (.permit
+           strict-handler
+           alice-context
+           "execute"
+           {DRPCSimpleACLAuthorizer/FUNCTION_KEY nil})))
+
+    (is (not
+         (.permit permissive-handler bob-context "execute" {})))
+
+    (is (not
+         (.permit
+           permissive-handler
+           bob-context
+           "execute"
+           {DRPCSimpleACLAuthorizer/FUNCTION_KEY nil}))))
+
+  (deftest test-deny-when-invalid-user-given
+    (is (not
+          (.permit
+            strict-handler
+            (Mockito/mock ReqContext)
+            "execute"
+            {DRPCSimpleACLAuthorizer/FUNCTION_KEY function})))
+
+    (is (not
+          (.permit
+            strict-handler
+            nil
+            "execute"
+            {DRPCSimpleACLAuthorizer/FUNCTION_KEY function})))
+
+    (is (not
+          (.permit
+            permissive-handler
+            (Mockito/mock ReqContext)
+            "execute"
+            {DRPCSimpleACLAuthorizer/FUNCTION_KEY function})))
+
+    (is (not
+          (.permit
+            permissive-handler
+            nil
+            "execute"
+            {DRPCSimpleACLAuthorizer/FUNCTION_KEY function})))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/auto_login_module_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/auto_login_module_test.clj b/storm-core/test/clj/backtype/storm/security/auth/auto_login_module_test.clj
new file mode 100644
index 0000000..2056509
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/auto_login_module_test.clj
@@ -0,0 +1,91 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.security.auth.auto-login-module-test
+  (:use [clojure test])
+  (:use [backtype.storm util])
+  (:import [backtype.storm.security.auth.kerberos AutoTGT
+            AutoTGTKrb5LoginModule AutoTGTKrb5LoginModuleTest])
+  (:import [javax.security.auth Subject Subject])
+  (:import [javax.security.auth.kerberos KerberosTicket])
+  (:import [org.mockito Mockito])
+  )
+
+(deftest login-module-no-subj-no-tgt-test
+  (testing "Behavior is correct when there is no Subject or TGT"
+    (let [login-module (AutoTGTKrb5LoginModule.)]
+
+      (is (thrown-cause? javax.security.auth.login.LoginException
+                         (.login login-module)))
+      (is (not (.commit login-module)))
+      (is (not (.abort login-module)))
+      (is (.logout login-module)))))
+
+(deftest login-module-readonly-subj-no-tgt-test
+  (testing "Behavior is correct when there is a read-only Subject and no TGT"
+    (let [readonly-subj (Subject. true #{} #{} #{})
+          login-module (AutoTGTKrb5LoginModule.)]
+      (.initialize login-module readonly-subj nil nil nil)
+      (is (not (.commit login-module)))
+      (is (.logout login-module)))))
+
+(deftest login-module-with-subj-no-tgt-test
+  (testing "Behavior is correct when there is a Subject and no TGT"
+    (let [login-module (AutoTGTKrb5LoginModule.)]
+      (.initialize login-module (Subject.) nil nil nil)
+      (is (thrown-cause? javax.security.auth.login.LoginException
+                         (.login login-module)))
+      (is (not (.commit login-module)))
+      (is (not (.abort login-module)))
+      (is (.logout login-module)))))
+
+(deftest login-module-no-subj-with-tgt-test
+  (testing "Behavior is correct when there is no Subject and a TGT"
+    (let [login-module (AutoTGTKrb5LoginModuleTest.)]
+      (.setKerbTicket login-module (Mockito/mock KerberosTicket))
+      (is (.login login-module))
+      (is (thrown-cause? javax.security.auth.login.LoginException
+                         (.commit login-module)))
+
+      (.setKerbTicket login-module (Mockito/mock KerberosTicket))
+      (is (.abort login-module))
+      (is (.logout login-module)))))
+
+(deftest login-module-readonly-subj-with-tgt-test
+  (testing "Behavior is correct when there is a read-only Subject and a TGT"
+    (let [readonly-subj (Subject. true #{} #{} #{})
+          login-module (AutoTGTKrb5LoginModuleTest.)]
+      (.initialize login-module readonly-subj nil nil nil)
+      (.setKerbTicket login-module (Mockito/mock KerberosTicket))
+      (is (.login login-module))
+      (is (thrown-cause? javax.security.auth.login.LoginException
+                         (.commit login-module)))
+
+      (.setKerbTicket login-module (Mockito/mock KerberosTicket))
+      (is (.abort login-module))
+      (is (.logout login-module)))))
+
+(deftest login-module-with-subj-and-tgt
+  (testing "Behavior is correct when there is a Subject and a TGT"
+    (let [login-module (AutoTGTKrb5LoginModuleTest.)
+          _ (set! (. login-module client) (Mockito/mock
+                                            java.security.Principal))
+          ticket (Mockito/mock KerberosTicket)]
+      (.initialize login-module (Subject.) nil nil nil)
+      (.setKerbTicket login-module ticket)
+      (is (.login login-module))
+      (is (.commit login-module))
+      (is (.abort login-module))
+      (is (.logout login-module)))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-alice.jaas
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-alice.jaas b/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-alice.jaas
new file mode 100644
index 0000000..cd691ae
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-alice.jaas
@@ -0,0 +1,5 @@
+StormClient {
+       org.apache.zookeeper.server.auth.DigestLoginModule required
+       username="alice"
+       password="poorpasswordforalice";
+};

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-bob.jaas
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-bob.jaas b/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-bob.jaas
new file mode 100644
index 0000000..e4ca097
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-bob.jaas
@@ -0,0 +1,5 @@
+StormClient {
+       org.apache.zookeeper.server.auth.DigestLoginModule required
+       username="bob"
+       password="poorpasswordforbob";
+};

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-charlie.jaas
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-charlie.jaas b/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-charlie.jaas
new file mode 100644
index 0000000..3473d6d
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-charlie.jaas
@@ -0,0 +1,5 @@
+StormClient {
+       org.apache.zookeeper.server.auth.DigestLoginModule required
+       username="charlie"
+       password="poorpasswordforcharlie";
+};

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-server.jaas
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-server.jaas b/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-server.jaas
new file mode 100644
index 0000000..3b22d21
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/drpc-auth-server.jaas
@@ -0,0 +1,6 @@
+StormServer {
+       org.apache.zookeeper.server.auth.DigestLoginModule required
+       user_alice="poorpasswordforalice"
+       user_bob="poorpasswordforbob"
+       user_charlie="poorpasswordforcharlie";
+};

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/drpc_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/drpc_auth_test.clj b/storm-core/test/clj/backtype/storm/security/auth/drpc_auth_test.clj
new file mode 100644
index 0000000..ff431ec
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/drpc_auth_test.clj
@@ -0,0 +1,315 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.security.auth.drpc-auth-test
+  (:use [clojure test])
+  (:require [backtype.storm.daemon [drpc :as drpc]])
+  (:import [backtype.storm.generated AuthorizationException
+            DRPCExecutionException DistributedRPC$Processor
+            DistributedRPCInvocations$Processor])
+  (:import [backtype.storm Config])
+  (:import [backtype.storm.security.auth ReqContext SingleUserPrincipal ThriftServer ThriftConnectionType])
+  (:import [backtype.storm.utils DRPCClient])
+  (:import [backtype.storm.drpc DRPCInvocationsClient])
+  (:import [java.util.concurrent TimeUnit])
+  (:import [javax.security.auth Subject])
+  (:use [backtype.storm bootstrap util])
+  (:use [backtype.storm.daemon common])
+  (:use [backtype.storm bootstrap testing])
+  )
+
+(bootstrap)
+
+(def drpc-timeout (Integer. 30))
+
+(defn launch-server [conf drpcAznClass transportPluginClass login-cfg client-port invocations-port]
+  (let [conf (if drpcAznClass (assoc conf DRPC-AUTHORIZER drpcAznClass) conf)
+        conf (if transportPluginClass (assoc conf STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass) conf)
+        conf (if login-cfg (assoc conf "java.security.auth.login.config" login-cfg) conf)
+        conf (assoc conf DRPC-PORT client-port)
+        conf (assoc conf DRPC-INVOCATIONS-PORT invocations-port)
+        service-handler (drpc/service-handler conf)
+        handler-server (ThriftServer. conf
+                                      (DistributedRPC$Processor. service-handler)
+                                      ThriftConnectionType/DRPC)
+        invoke-server (ThriftServer. conf
+                                     (DistributedRPCInvocations$Processor. service-handler)
+                                     ThriftConnectionType/DRPC_INVOCATIONS)]
+    (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop handler-server) (.stop invoke-server))))
+    (log-message "storm conf:" conf)
+    (log-message "Starting DRPC invocation server ...")
+    (.start (Thread. #(.serve invoke-server)))
+    (wait-for-condition #(.isServing invoke-server))
+    (log-message "Starting DRPC handler server ...")
+    (.start (Thread. #(.serve handler-server)))
+    (wait-for-condition #(.isServing handler-server))
+    [handler-server invoke-server]))
+
+(defmacro with-server [args & body]
+  `(let [[handler-server# invoke-server#] (launch-server ~@args)]
+      ~@body
+      (log-message "Stopping DRPC servers ...")
+      (.stop handler-server#)
+      (.stop invoke-server#)
+      ))
+
+(deftest deny-drpc-test
+  (let [client-port (available-port)
+        invocations-port (available-port (inc client-port))
+        storm-conf (read-storm-config)]
+    (with-server [storm-conf "backtype.storm.security.auth.authorizer.DenyAuthorizer"
+                  nil nil client-port invocations-port]
+      (let [drpc (DRPCClient. storm-conf "localhost" client-port)
+            drpc_client (.getClient drpc)
+            invocations (DRPCInvocationsClient. storm-conf "localhost" invocations-port)
+            invocations_client (.getClient invocations)]
+        (is (thrown? AuthorizationException (.execute drpc_client "func-foo" "args-bar")))
+        (is (thrown? AuthorizationException (.fetchRequest invocations_client nil)))
+        (.close drpc)
+        (.close invocations)))))
+
+(deftest deny-drpc-digest-test
+  (let [client-port (available-port)
+        invocations-port (available-port (inc client-port))
+        storm-conf (read-storm-config)]
+    (with-server [storm-conf "backtype.storm.security.auth.authorizer.DenyAuthorizer"
+                  "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+                  "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+                  client-port invocations-port]
+      (let [conf (merge storm-conf {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+                             "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"})
+            drpc (DRPCClient. conf "localhost" client-port)
+            drpc_client (.getClient drpc)
+            invocations (DRPCInvocationsClient. conf "localhost" invocations-port)
+            invocations_client (.getClient invocations)]
+        (is (thrown? AuthorizationException (.execute drpc_client "func-foo" "args-bar")))
+        (is (thrown? AuthorizationException (.fetchRequest invocations_client nil)))
+        (.close drpc)
+        (.close invocations)))))
+
+(defmacro with-simple-drpc-test-scenario
+  [[strict? alice-client bob-client charlie-client alice-invok charlie-invok] & body]
+  (let [client-port (available-port)
+        invocations-port (available-port (inc client-port))
+        storm-conf (merge (read-storm-config)
+                          {DRPC-AUTHORIZER-ACL-STRICT strict?
+                           DRPC-AUTHORIZER-ACL-FILENAME "drpc-simple-acl-test-scenario.yaml"
+                           STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"})]
+    `(with-server [~storm-conf
+                   "backtype.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer"
+                   "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+                   "test/clj/backtype/storm/security/auth/drpc-auth-server.jaas"
+                   ~client-port ~invocations-port]
+       (let [~alice-client (DRPCClient.
+                           (merge ~storm-conf {"java.security.auth.login.config"
+                                              "test/clj/backtype/storm/security/auth/drpc-auth-alice.jaas"})
+                           "localhost"
+                           ~client-port)
+             ~bob-client (DRPCClient.
+                         (merge ~storm-conf {"java.security.auth.login.config"
+                                            "test/clj/backtype/storm/security/auth/drpc-auth-bob.jaas"})
+                         "localhost"
+                         ~client-port)
+             ~charlie-client (DRPCClient.
+                               (merge ~storm-conf {"java.security.auth.login.config"
+                                                  "test/clj/backtype/storm/security/auth/drpc-auth-charlie.jaas"})
+                               "localhost"
+                               ~client-port)
+             ~alice-invok (DRPCInvocationsClient.
+                            (merge ~storm-conf {"java.security.auth.login.config"
+                                               "test/clj/backtype/storm/security/auth/drpc-auth-alice.jaas"})
+                            "localhost"
+                            ~invocations-port)
+             ~charlie-invok (DRPCInvocationsClient.
+                             (merge ~storm-conf {"java.security.auth.login.config"
+                                                "test/clj/backtype/storm/security/auth/drpc-auth-charlie.jaas"})
+                             "localhost"
+                             ~invocations-port)]
+         (try
+           ~@body
+           (finally
+             (.close ~alice-client)
+             (.close ~bob-client)
+             (.close ~charlie-client)
+             (.close ~alice-invok)
+             (.close ~charlie-invok)))))))
+
+(deftest drpc-per-function-auth-strict-test
+  (with-simple-drpc-test-scenario [true alice-client bob-client charlie-client alice-invok charlie-invok]
+    (let [drpc-timeout-seconds 10]
+      (testing "Permitted user can execute a function in the ACL"
+        (let [func "jump"
+              exec-ftr (future (.execute alice-client func "some args"))
+              id (atom "")
+              expected "Authorized DRPC"]
+          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+            (while (empty? @id)
+              (reset! id
+                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
+          (.result charlie-invok @id expected)
+          (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
+
+      (testing "execute fails when function is not in ACL"
+        (is (thrown-cause? AuthorizationException
+          (.execute alice-client "jog" "some args"))))
+
+      (testing "fetchRequest fails when function is not in ACL"
+        (is (thrown-cause? AuthorizationException
+          (.fetchRequest charlie-invok "jog"))))
+
+      (testing "authorized user can fail a request"
+        (let [func "jump"
+              exec-ftr (future (.execute alice-client func "some args"))
+              id (atom "")]
+          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+            (while (empty? @id)
+              (reset! id
+                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
+          (.failRequest charlie-invok @id)
+          (is (thrown-cause? DRPCExecutionException
+                             (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
+
+      (testing "unauthorized invocation user is denied returning a result"
+        (let [func "jump"
+              exec-ftr (future (.execute bob-client func "some args"))
+              id (atom "")
+              expected "Only Authorized User can populate the result"]
+          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+            (while (empty? @id)
+              (reset! id
+                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
+          (is (thrown-cause? AuthorizationException
+            (.result alice-invok @id "not the expected result")))
+          (.result charlie-invok @id expected)
+          (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
+
+      (testing "unauthorized invocation user is denied failing a request"
+        (let [func "jump"
+              exec-ftr (future (.execute alice-client func "some args"))
+              id (atom "")]
+          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+            (while (empty? @id)
+              (reset! id
+                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
+          (is (thrown-cause? AuthorizationException
+            (.failRequest alice-invok @id)))
+          (.failRequest charlie-invok @id))))
+
+      (testing "unauthorized invocation user is denied fetching a request"
+        (let [func "jump"
+              exec-ftr (future (.execute bob-client func "some args"))
+              id (atom "")
+              expected "Only authorized users can fetchRequest"]
+          (Thread/sleep 1000)
+          (is (thrown-cause? AuthorizationException
+            (-> alice-invok (.fetchRequest func) .get_request_id)))
+          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+            (while (empty? @id)
+              (reset! id
+                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
+          (.result charlie-invok @id expected)
+          (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS)))))))))
+
+(deftest drpc-per-function-auth-non-strict-test
+  (with-simple-drpc-test-scenario [false alice-client bob-client charlie-client alice-invok charlie-invok]
+    (let [drpc-timeout-seconds 10]
+      (testing "Permitted user can execute a function in the ACL"
+        (let [func "jump"
+              exec-ftr (future (.execute alice-client func "some args"))
+              id (atom "")
+              expected "Authorized DRPC"]
+          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+            (while (empty? @id)
+              (reset! id
+                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
+          (.result charlie-invok @id expected)
+          (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
+
+      (testing "DRPC succeeds for anyone when function is not in ACL"
+        (let [func "jog"
+              exec-ftr (future (.execute charlie-client func "some args"))
+              id (atom "")
+              expected "Permissive/No ACL Entry"]
+          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+            (while (empty? @id)
+              (reset! id
+                      (-> alice-invok (.fetchRequest func) .get_request_id)))
+          (.result alice-invok @id expected)
+          (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
+
+      (testing "failure of a request is allowed when function is not in ACL"
+        (let [func "jog"
+              exec-ftr (future (.execute charlie-client func "some args"))
+              id (atom "")]
+          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+            (while (empty? @id)
+              (reset! id
+                      (-> alice-invok (.fetchRequest func) .get_request_id)))
+          (.failRequest alice-invok @id)
+          (is (thrown-cause? DRPCExecutionException
+                             (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
+
+      (testing "authorized user can fail a request"
+        (let [func "jump"
+              exec-ftr (future (.execute alice-client func "some args"))
+              id (atom "")]
+          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+            (while (empty? @id)
+              (reset! id
+                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
+          (.failRequest charlie-invok @id)
+          (is (thrown-cause? DRPCExecutionException
+                             (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
+
+      (testing "unauthorized invocation user is denied returning a result"
+        (let [func "jump"
+              exec-ftr (future (.execute bob-client func "some args"))
+              id (atom "")
+              expected "Only Authorized User can populate the result"]
+          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+            (while (empty? @id)
+              (reset! id
+                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
+          (is (thrown-cause? AuthorizationException
+            (.result alice-invok @id "not the expected result")))
+          (.result charlie-invok @id expected)
+          (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
+
+      (testing "unauthorized invocation user is denied failing a request"
+        (let [func "jump"
+              exec-ftr (future (.execute alice-client func "some args"))
+              id (atom "")]
+          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+            (while (empty? @id)
+              (reset! id
+                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
+          (is (thrown-cause? AuthorizationException
+            (.failRequest alice-invok @id)))
+          (.failRequest charlie-invok @id))))
+
+      (testing "unauthorized invocation user is denied fetching a request"
+        (let [func "jump"
+              exec-ftr (future (.execute bob-client func "some args"))
+              id (atom "")
+              expected "Only authorized users can fetchRequest"]
+          (Thread/sleep 1000)
+          (is (thrown-cause? AuthorizationException
+            (-> alice-invok (.fetchRequest func) .get_request_id)))
+          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
+            (while (empty? @id)
+              (reset! id
+                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
+          (.result charlie-invok @id expected)
+          (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS)))))))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
new file mode 100644
index 0000000..bb70239
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
@@ -0,0 +1,181 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.security.auth.nimbus-auth-test
+  (:use [clojure test])
+  (:require [backtype.storm [testing :as testing]])
+  (:require [backtype.storm.daemon [nimbus :as nimbus]])
+  (:require [backtype.storm [zookeeper :as zk]])
+  (:import [java.nio ByteBuffer])
+  (:import [backtype.storm Config])
+  (:import [backtype.storm.utils NimbusClient])
+  (:import [backtype.storm.generated NotAliveException])
+  (:import [backtype.storm.security.auth AuthUtils ThriftServer ThriftClient 
+                                         ReqContext ThriftConnectionType])
+  (:use [backtype.storm bootstrap cluster util])
+  (:use [backtype.storm.daemon common nimbus])
+  (:use [backtype.storm bootstrap])
+  (:import [backtype.storm.generated Nimbus Nimbus$Client 
+            AuthorizationException SubmitOptions TopologyInitialStatus KillOptions])
+  (:require [conjure.core])
+  (:use [conjure core])
+  )
+
+(bootstrap)
+
+(def nimbus-timeout (Integer. 30))
+
+(defn launch-test-cluster [nimbus-port login-cfg aznClass transportPluginClass] 
+  (let [conf {NIMBUS-AUTHORIZER aznClass 
+              NIMBUS-THRIFT-PORT nimbus-port
+              STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass }
+        conf (if login-cfg (merge conf {"java.security.auth.login.config" login-cfg}) conf)
+        cluster-map (testing/mk-local-storm-cluster :supervisors 0
+                                            :ports-per-supervisor 0
+                                            :daemon-conf conf)
+        nimbus-server (ThriftServer. (:daemon-conf cluster-map)
+                                     (Nimbus$Processor. (:nimbus cluster-map)) 
+                                     ThriftConnectionType/NIMBUS)]
+    (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop nimbus-server))))
+    (.start (Thread. #(.serve nimbus-server)))
+    (wait-for-condition #(.isServing nimbus-server))
+    [cluster-map nimbus-server]))
+
+(defmacro with-test-cluster [args & body]
+  `(let [[cluster-map#  nimbus-server#] (launch-test-cluster  ~@args)]
+      ~@body
+      (log-debug "Shutdown cluster from macro")
+      (testing/kill-local-storm-cluster cluster-map#)
+      (.stop nimbus-server#)))
+
+(deftest Simple-authentication-test 
+  (with-test-cluster [6627 nil nil "backtype.storm.security.auth.SimpleTransportPlugin"]
+    (let [storm-conf (merge (read-storm-config)
+                            {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
+                             STORM-NIMBUS-RETRY-TIMES 0})
+          client (NimbusClient. storm-conf "localhost" 6627 nimbus-timeout)
+          nimbus_client (.getClient client)]
+      (testing "(Positive authorization) Simple protocol w/o authentication/authorization enforcement"
+               (is (thrown-cause? NotAliveException
+                            (.activate nimbus_client "topo-name"))))
+      (.close client))))
+  
+(deftest test-noop-authorization-w-simple-transport 
+  (with-test-cluster [6628 nil 
+                "backtype.storm.security.auth.authorizer.NoopAuthorizer" 
+                "backtype.storm.security.auth.SimpleTransportPlugin"]
+    (let [storm-conf (merge (read-storm-config)
+                             {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
+                              STORM-NIMBUS-RETRY-TIMES 0})
+          client (NimbusClient. storm-conf "localhost" 6628 nimbus-timeout)
+          nimbus_client (.getClient client)]
+      (testing "(Positive authorization) Authorization plugin should accept client request"
+               (is (thrown-cause? NotAliveException
+                            (.activate nimbus_client "topo-name"))))
+      (.close client))))
+
+(deftest test-deny-authorization-w-simple-transport 
+  (with-test-cluster [6629 nil
+                "backtype.storm.security.auth.authorizer.DenyAuthorizer" 
+                "backtype.storm.security.auth.SimpleTransportPlugin"]
+    (let [storm-conf (merge (read-storm-config)
+                             {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
+                             Config/NIMBUS_HOST "localhost"
+                             Config/NIMBUS_THRIFT_PORT 6629
+                             STORM-NIMBUS-RETRY-TIMES 0})
+          client (NimbusClient/getConfiguredClient storm-conf)
+          nimbus_client (.getClient client)
+          topologyInitialStatus (TopologyInitialStatus/findByValue 2)
+          submitOptions (SubmitOptions. topologyInitialStatus)]
+      (is (thrown-cause? AuthorizationException (.submitTopology nimbus_client  "topo-name" nil nil nil))) 
+      (is (thrown-cause? AuthorizationException (.submitTopologyWithOpts nimbus_client  "topo-name" nil nil nil submitOptions)))
+      (is (thrown-cause? AuthorizationException (.beginFileUpload nimbus_client)))
+      (is (thrown-cause? AuthorizationException (.uploadChunk nimbus_client nil nil)))
+      (is (thrown-cause? AuthorizationException (.finishFileUpload nimbus_client nil)))
+      (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus_client nil)))
+      (is (thrown-cause? AuthorizationException (.downloadChunk nimbus_client nil)))
+      (is (thrown-cause? AuthorizationException (.getNimbusConf nimbus_client)))
+      (is (thrown-cause? AuthorizationException (.getClusterInfo nimbus_client)))
+      (stubbing [nimbus/check-storm-active! nil
+                 nimbus/try-read-storm-conf-from-name {}]
+        (is (thrown-cause? AuthorizationException (.killTopology nimbus_client "topo-name")))
+        (is (thrown-cause? AuthorizationException (.killTopologyWithOpts nimbus_client "topo-name" (KillOptions.))))
+        (is (thrown-cause? AuthorizationException (.activate nimbus_client "topo-name")))
+        (is (thrown-cause? AuthorizationException (.deactivate nimbus_client "topo-name")))
+        (is (thrown-cause? AuthorizationException (.rebalance nimbus_client "topo-name" nil)))
+      )
+      (stubbing [nimbus/try-read-storm-conf {}]
+        (is (thrown-cause? AuthorizationException (.getTopologyConf nimbus_client "topo-ID")))
+        (is (thrown-cause? AuthorizationException (.getTopology nimbus_client "topo-ID")))
+        (is (thrown-cause? AuthorizationException (.getUserTopology nimbus_client "topo-ID")))
+        (is (thrown-cause? AuthorizationException (.getTopologyInfo nimbus_client "topo-ID"))))
+      (.close client))))
+
+(deftest test-noop-authorization-w-sasl-digest 
+  (with-test-cluster [6630
+                "test/clj/backtype/storm/security/auth/jaas_digest.conf" 
+                "backtype.storm.security.auth.authorizer.NoopAuthorizer" 
+                "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"]
+    (let [storm-conf (merge (read-storm-config)
+                            {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+                             "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+                             Config/NIMBUS_HOST "localhost"
+                             Config/NIMBUS_THRIFT_PORT 6630
+                             STORM-NIMBUS-RETRY-TIMES 0})
+          client (NimbusClient/getConfiguredClient storm-conf)
+          nimbus_client (.getClient client)]
+      (testing "(Positive authorization) Authorization plugin should accept client request"
+               (is (thrown-cause? NotAliveException
+                            (.activate nimbus_client "topo-name"))))
+      (.close client))))
+
+(deftest test-deny-authorization-w-sasl-digest 
+  (with-test-cluster [6631
+                "test/clj/backtype/storm/security/auth/jaas_digest.conf" 
+                "backtype.storm.security.auth.authorizer.DenyAuthorizer" 
+                "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"]
+    (let [storm-conf (merge (read-storm-config)
+                            {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+                             "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+                             Config/NIMBUS_HOST "localhost"
+                             Config/NIMBUS_THRIFT_PORT 6631
+                             STORM-NIMBUS-RETRY-TIMES 0})
+          client (NimbusClient/getConfiguredClient storm-conf)
+          nimbus_client (.getClient client)
+          topologyInitialStatus (TopologyInitialStatus/findByValue 2)
+          submitOptions (SubmitOptions. topologyInitialStatus)]
+      (is (thrown-cause? AuthorizationException (.submitTopology nimbus_client  "topo-name" nil nil nil))) 
+      (is (thrown-cause? AuthorizationException (.submitTopologyWithOpts nimbus_client  "topo-name" nil nil nil submitOptions)))
+      (is (thrown-cause? AuthorizationException (.beginFileUpload nimbus_client)))
+      (is (thrown-cause? AuthorizationException (.uploadChunk nimbus_client nil nil)))
+      (is (thrown-cause? AuthorizationException (.finishFileUpload nimbus_client nil)))
+      (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus_client nil)))
+      (is (thrown-cause? AuthorizationException (.downloadChunk nimbus_client nil)))
+      (is (thrown-cause? AuthorizationException (.getNimbusConf nimbus_client)))
+      (is (thrown-cause? AuthorizationException (.getClusterInfo nimbus_client)))
+      (stubbing [nimbus/check-storm-active! nil
+                 nimbus/try-read-storm-conf-from-name {}]
+        (is (thrown-cause? AuthorizationException (.killTopology nimbus_client "topo-name")))
+        (is (thrown-cause? AuthorizationException (.killTopologyWithOpts nimbus_client "topo-name" (KillOptions.))))
+        (is (thrown-cause? AuthorizationException (.activate nimbus_client "topo-name")))
+        (is (thrown-cause? AuthorizationException (.deactivate nimbus_client "topo-name")))
+        (is (thrown-cause? AuthorizationException (.rebalance nimbus_client "topo-name" nil))))
+      (stubbing [nimbus/try-read-storm-conf {}]
+        (is (thrown-cause? AuthorizationException (.getTopologyConf nimbus_client "topo-ID")))
+        (is (thrown-cause? AuthorizationException (.getTopology nimbus_client "topo-ID")))
+        (is (thrown-cause? AuthorizationException (.getUserTopology nimbus_client "topo-ID")))
+        (is (thrown-cause? AuthorizationException (.getTopologyInfo nimbus_client "topo-ID"))))
+      (.close client))))
+

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/submitter_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/submitter_test.clj b/storm-core/test/clj/backtype/storm/submitter_test.clj
new file mode 100644
index 0000000..6f88297
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/submitter_test.clj
@@ -0,0 +1,75 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.submitter-test
+  (:use [clojure test])
+  (:use [backtype.storm config testing])
+  (:import [backtype.storm StormSubmitter])
+  )
+
+(deftest test-md5-digest-secret-generation
+  (testing "No payload or scheme are generated when already present"
+    (let [conf {STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD "foobar:12345"
+                STORM-ZOOKEEPER-AUTH-SCHEME "anything"}
+          result (StormSubmitter/prepareZookeeperAuthentication conf)
+          actual-payload (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)
+          actual-scheme (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME)]
+      (is (nil? actual-payload))
+      (is (= "digest" actual-scheme))))
+
+  (testing "Scheme is set to digest if not already."
+    (let [conf {STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD "foobar:12345"}
+          result (StormSubmitter/prepareZookeeperAuthentication conf)
+          actual-payload (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)
+          actual-scheme (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME)]
+      (is (nil? actual-payload))
+      (is (= "digest" actual-scheme))))
+
+  (testing "A payload is generated when no payload is present."
+    (let [conf {STORM-ZOOKEEPER-AUTH-SCHEME "anything"}
+          result (StormSubmitter/prepareZookeeperAuthentication conf)
+          actual-payload (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)
+          actual-scheme (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME)]
+      (is (not (clojure.string/blank? actual-payload)))
+      (is (= "digest" actual-scheme))))
+
+  (testing "A payload is generated when payload is not correctly formatted."
+    (let [bogus-payload "not-a-valid-payload"
+          conf {STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD bogus-payload
+                STORM-ZOOKEEPER-AUTH-SCHEME "anything"}
+          result (StormSubmitter/prepareZookeeperAuthentication conf)
+          actual-payload (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)
+          actual-scheme (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME)]
+      (is (not (StormSubmitter/validateZKDigestPayload bogus-payload))) ; Is this test correct?
+      (is (not (clojure.string/blank? actual-payload)))
+      (is (= "digest" actual-scheme))))
+
+  (testing "A payload is generated when payload is null."
+    (let [conf {STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD nil
+                STORM-ZOOKEEPER-AUTH-SCHEME "anything"}
+          result (StormSubmitter/prepareZookeeperAuthentication conf)
+          actual-payload (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)
+          actual-scheme (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME)]
+      (is (not (clojure.string/blank? actual-payload)))
+      (is (= "digest" actual-scheme))))
+
+  (testing "A payload is generated when payload is blank."
+    (let [conf {STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD ""
+                STORM-ZOOKEEPER-AUTH-SCHEME "anything"}
+          result (StormSubmitter/prepareZookeeperAuthentication conf)
+          actual-payload (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)
+          actual-scheme (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME)]
+      (is (not (clojure.string/blank? actual-payload)))
+      (is (= "digest" actual-scheme)))))