You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/21 18:06:58 UTC

[03/14] STORM-216: Added Authentication and Authorization.

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/cluster_test.clj b/storm-core/test/clj/backtype/storm/cluster_test.clj
index 987429b..ba94ab1 100644
--- a/storm-core/test/clj/backtype/storm/cluster_test.clj
+++ b/storm-core/test/clj/backtype/storm/cluster_test.clj
@@ -16,6 +16,14 @@
 (ns backtype.storm.cluster-test
   (:import [java.util Arrays])
   (:import [backtype.storm.daemon.common Assignment StormBase SupervisorInfo])
+  (:import [org.apache.zookeeper ZooDefs ZooDefs$Ids])
+  (:import [org.mockito Mockito])
+  (:import [org.mockito.exceptions.base MockitoAssertionError])
+  (:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory CuratorFrameworkFactory$Builder])
+  (:import [backtype.storm.utils Utils TestUtils ZookeeperAuthInfo])
+  (:require [backtype.storm [zookeeper :as zk]])
+  (:require [conjure.core])
+  (:use [conjure core])
   (:use [clojure test])
   (:use [backtype.storm cluster config util testing]))
 
@@ -25,7 +33,8 @@
           STORM-ZOOKEEPER-SERVERS ["localhost"]}))
 
 (defn mk-state
-  ([zk-port] (mk-distributed-cluster-state (mk-config zk-port)))
+  ([zk-port] (let [conf (mk-config zk-port)]
+               (mk-distributed-cluster-state conf :auth-conf conf)))
   ([zk-port cb]
      (let [ret (mk-state zk-port)]
        (.register ret cb)
@@ -36,16 +45,16 @@
 (deftest test-basics
   (with-inprocess-zookeeper zk-port
     (let [state (mk-state zk-port)]
-      (.set-data state "/root" (barr 1 2 3))
+      (.set-data state "/root" (barr 1 2 3) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (is (Arrays/equals (barr 1 2 3) (.get-data state "/root" false)))
       (is (= nil (.get-data state "/a" false)))
-      (.set-data state "/root/a" (barr 1 2))
-      (.set-data state "/root" (barr 1))
+      (.set-data state "/root/a" (barr 1 2) ZooDefs$Ids/OPEN_ACL_UNSAFE)
+      (.set-data state "/root" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (is (Arrays/equals (barr 1) (.get-data state "/root" false)))
       (is (Arrays/equals (barr 1 2) (.get-data state "/root/a" false)))
-      (.set-data state "/a/b/c/d" (barr 99))
+      (.set-data state "/a/b/c/d" (barr 99) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (is (Arrays/equals (barr 99) (.get-data state "/a/b/c/d" false)))
-      (.mkdirs state "/lalala")
+      (.mkdirs state "/lalala" ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (is (= [] (.get-children state "/lalala" false)))
       (is (= #{"root" "a" "lalala"} (set (.get-children state "/" false))))
       (.delete-node state "/a")
@@ -58,7 +67,7 @@
   (with-inprocess-zookeeper zk-port
     (let [state1 (mk-state zk-port)
           state2 (mk-state zk-port)]
-      (.set-data state1 "/root" (barr 1))
+      (.set-data state1 "/root" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (is (Arrays/equals (barr 1) (.get-data state1 "/root" false)))
       (is (Arrays/equals (barr 1) (.get-data state2 "/root" false)))
       (.delete-node state2 "/root")
@@ -73,7 +82,7 @@
     (let [state1 (mk-state zk-port)
           state2 (mk-state zk-port)
           state3 (mk-state zk-port)]
-      (.set-ephemeral-node state1 "/a" (barr 1))
+      (.set-ephemeral-node state1 "/a" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (is (Arrays/equals (barr 1) (.get-data state1 "/a" false)))
       (is (Arrays/equals (barr 1) (.get-data state2 "/a" false)))
       (.close state3)
@@ -111,37 +120,37 @@
           state1 (mk-state zk-port state1-cb)
           [state2-last-cb state2-cb] (mk-callback-tester)
           state2 (mk-state zk-port state2-cb)]
-      (.set-data state1 "/root" (barr 1))
+      (.set-data state1 "/root" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (.get-data state2 "/root" true)
       (is (= nil @state1-last-cb))
       (is (= nil @state2-last-cb))
-      (.set-data state2 "/root" (barr 2))
+      (.set-data state2 "/root" (barr 2) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (is (= {:type :node-data-changed :path "/root"} (read-and-reset! state2-last-cb)))
       (is (= nil @state1-last-cb))
 
-      (.set-data state2 "/root" (barr 3))
+      (.set-data state2 "/root" (barr 3) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (is (= nil @state2-last-cb))
       (.get-data state2 "/root" true)
       (.get-data state2 "/root" false)
       (.delete-node state1 "/root")
       (is (= {:type :node-deleted :path "/root"} (read-and-reset! state2-last-cb)))
       (.get-data state2 "/root" true)
-      (.set-ephemeral-node state1 "/root" (barr 1 2 3 4))
+      (.set-ephemeral-node state1 "/root" (barr 1 2 3 4) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (is (= {:type :node-created :path "/root"} (read-and-reset! state2-last-cb)))
 
       (.get-children state1 "/" true)
-      (.set-data state2 "/a" (barr 9))
+      (.set-data state2 "/a" (barr 9) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (is (= nil @state2-last-cb))
       (is (= {:type :node-children-changed :path "/"} (read-and-reset! state1-last-cb)))
 
       (.get-data state2 "/root" true)
-      (.set-ephemeral-node state1 "/root" (barr 1 2))
+      (.set-ephemeral-node state1 "/root" (barr 1 2) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (is (= {:type :node-data-changed :path "/root"} (read-and-reset! state2-last-cb)))
 
-      (.mkdirs state1 "/ccc")
+      (.mkdirs state1 "/ccc" ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (.get-children state1 "/ccc" true)
       (.get-data state2 "/ccc/b" true)
-      (.set-data state2 "/ccc/b" (barr 8))
+      (.set-data state2 "/ccc/b" (barr 8) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (is (= {:type :node-created :path "/ccc/b"} (read-and-reset! state2-last-cb)))
       (is (= {:type :node-children-changed :path "/ccc"} (read-and-reset! state1-last-cb)))
 
@@ -150,7 +159,7 @@
       (.close state1)
 
       (is (= {:type :node-deleted :path "/root"} (read-and-reset! state2-last-cb)))
-      (.set-data state2 "/root2" (barr 9))
+      (.set-data state2 "/root2" (barr 9) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (is (= {:type :node-created :path "/root2"} (read-and-reset! state2-last-cb)))
       (.close state2)
       )))
@@ -161,8 +170,8 @@
     (let [state (mk-storm-state zk-port)
           assignment1 (Assignment. "/aaa" {} {1 [2 2002 1]} {})
           assignment2 (Assignment. "/aaa" {} {1 [2 2002]} {})
-          base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {})
-          base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {})]
+          base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "")
+          base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "")]
       (is (= [] (.assignments state nil)))
       (.set-assignment! state "storm1" assignment1)
       (is (= assignment1 (.assignment-info state "storm1" nil)))
@@ -186,6 +195,11 @@
       (is (= base2 (.storm-base state "storm2" nil)))
       (is (= #{"storm2"} (set (.active-storms state))))
 
+      (is (nil? (.credentials state "storm1" nil)))
+      (.set-credentials! state "storm1" {"a" "a"} {})
+      (is (= {"a" "a"} (.credentials state "storm1" nil)))
+      (.set-credentials! state "storm1" {"b" "b"} {})
+      (is (= {"b" "b"} (.credentials state "storm1" nil)))
 
       ;; TODO add tests for task info and task heartbeat setting and getting
       (.disconnect state)
@@ -240,9 +254,44 @@
       (.disconnect state1)
       )))
 
+
+
+(deftest test-cluster-authentication
+  (with-inprocess-zookeeper zk-port
+    (let [builder (Mockito/mock CuratorFrameworkFactory$Builder)
+          conf (merge
+                (mk-config zk-port)
+                {STORM-ZOOKEEPER-CONNECTION-TIMEOUT 10
+                 STORM-ZOOKEEPER-SESSION-TIMEOUT 10
+                 STORM-ZOOKEEPER-RETRY-INTERVAL 5
+                 STORM-ZOOKEEPER-RETRY-TIMES 2
+                 STORM-ZOOKEEPER-RETRY-INTERVAL-CEILING 15
+                 STORM-ZOOKEEPER-AUTH-SCHEME "digest"
+                 STORM-ZOOKEEPER-AUTH-PAYLOAD "storm:thisisapoorpassword"})]
+      (. (Mockito/when (.connectString builder (Mockito/anyString))) (thenReturn builder))
+      (. (Mockito/when (.connectionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
+      (. (Mockito/when (.sessionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
+      (TestUtils/testSetupBuilder builder (str zk-port "/") conf (ZookeeperAuthInfo. conf))
+      (is (nil?
+           (try
+             (. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD))))
+             (catch MockitoAssertionError e
+               e)))))))
+
 (deftest test-storm-state-callbacks
   ;; TODO finish
   )
 
-
-
+(deftest test-cluster-state-default-acls
+  (testing "The default ACLs are empty."
+    (stubbing [zk/mkdirs nil
+               zk/mk-client (reify CuratorFramework (^void close [this] nil))]
+      (mk-distributed-cluster-state {})
+      (verify-call-times-for zk/mkdirs 1)
+      (verify-first-call-args-for-indices zk/mkdirs [2] nil))
+    (stubbing [mk-distributed-cluster-state nil
+               register nil
+               mkdirs nil]
+      (mk-storm-cluster-state {})
+      (verify-call-times-for mk-distributed-cluster-state 1)
+      (verify-first-call-args-for-indices mk-distributed-cluster-state [4] nil))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/config_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/config_test.clj b/storm-core/test/clj/backtype/storm/config_test.clj
index 01f788b..2b7f96e 100644
--- a/storm-core/test/clj/backtype/storm/config_test.clj
+++ b/storm-core/test/clj/backtype/storm/config_test.clj
@@ -84,6 +84,17 @@
     (is (thrown-cause? java.lang.IllegalArgumentException
       (.validateField validator "test" 42)))))
 
+(deftest test-positive-integer-validator
+  (let [validator ConfigValidation/PositiveIntegerValidator]
+    (doseq [x [42.42 -32 0 -0 "Forty-two"]]
+      (is (thrown-cause? java.lang.IllegalArgumentException
+        (.validateField validator "test" x))))
+
+    (doseq [x [42 4294967296 1 nil]]
+      (is (nil? (try
+                  (.validateField validator "test" x)
+                  (catch Exception e e)))))))
+
 (deftest test-worker-childopts-is-string-or-string-list
   (let [pass-cases [nil "some string" ["some" "string" "list"]]]
     (testing "worker.childopts validates"

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/logviewer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/logviewer_test.clj b/storm-core/test/clj/backtype/storm/logviewer_test.clj
new file mode 100644
index 0000000..37e63b9
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/logviewer_test.clj
@@ -0,0 +1,187 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.logviewer-test
+  (:use [backtype.storm config util])
+  (:require [backtype.storm.daemon [logviewer :as logviewer]
+                                   [supervisor :as supervisor]])
+  (:require [conjure.core])
+  (:use [clojure test])
+  (:use [conjure core])
+  (:import [org.mockito Mockito]))
+
+(defmulti mk-mock-File #(:type %))
+
+(defmethod mk-mock-File :file [{file-name :name mtime :mtime
+                                :or {file-name "afile" mtime 1}}]
+  (let [mockFile (Mockito/mock java.io.File)]
+    (. (Mockito/when (.getName mockFile)) thenReturn file-name)
+    (. (Mockito/when (.lastModified mockFile)) thenReturn mtime)
+    (. (Mockito/when (.isFile mockFile)) thenReturn true)
+    (. (Mockito/when (.getCanonicalPath mockFile))
+       thenReturn (str "/mock/canonical/path/to/" file-name))
+    mockFile))
+
+(defmethod mk-mock-File :directory [{dir-name :name mtime :mtime
+                                     :or {dir-name "adir" mtime 1}}]
+  (let [mockDir (Mockito/mock java.io.File)]
+    (. (Mockito/when (.getName mockDir)) thenReturn dir-name)
+    (. (Mockito/when (.lastModified mockDir)) thenReturn mtime)
+    (. (Mockito/when (.isFile mockDir)) thenReturn false)
+    mockDir))
+
+(deftest test-mk-FileFilter-for-log-cleanup
+  (testing "log file filter selects the correct log files for purge"
+    (let [now-millis (current-time-millis)
+          conf {LOGVIEWER-CLEANUP-AGE-MINS 60
+                LOGVIEWER-CLEANUP-INTERVAL-SECS 300}
+          cutoff-millis (logviewer/cleanup-cutoff-age-millis conf now-millis)
+          old-mtime-millis (- cutoff-millis 500)
+          new-mtime-millis (+ cutoff-millis 500)
+          matching-files (map #(mk-mock-File %)
+                              [{:name "oldlog-1-2-worker-3.log"
+                                :type :file
+                                :mtime old-mtime-millis}
+                               {:name "oldlog-1-2-worker-3.log.8"
+                                :type :file
+                                :mtime old-mtime-millis}
+                               {:name "foobar*_topo-1-24242-worker-2834238.log"
+                                :type :file
+                                :mtime old-mtime-millis}])
+          excluded-files (map #(mk-mock-File %)
+                              [{:name "oldlog-1-2-worker-.log"
+                                :type :file
+                                :mtime old-mtime-millis}
+                               {:name "olddir-1-2-worker.log"
+                                :type :directory
+                                :mtime old-mtime-millis}
+                               {:name "newlog-1-2-worker.log"
+                                :type :file
+                                :mtime new-mtime-millis}
+                               {:name "some-old-file.txt"
+                                :type :file
+                                :mtime old-mtime-millis}
+                               {:name "metadata"
+                                :type :directory
+                                :mtime old-mtime-millis}
+                               {:name "newdir-1-2-worker.log"
+                                :type :directory
+                                :mtime new-mtime-millis}
+                               {:name "newdir"
+                                :type :directory
+                                :mtime new-mtime-millis}
+                              ])
+          file-filter (logviewer/mk-FileFilter-for-log-cleanup conf now-millis)]
+        (is   (every? #(.accept file-filter %) matching-files))
+        (is (not-any? #(.accept file-filter %) excluded-files))
+      )))
+
+(deftest test-get-log-root->files-map
+  (testing "returns map of root name to list of files"
+    (let [files (vec (map #(java.io.File. %) ["log-1-2-worker-3.log"
+                                              "log-1-2-worker-3.log.1"
+                                              "log-2-4-worker-6.log.1"]))
+          expected {"log-1-2-worker-3" #{(files 0) (files 1)}
+                    "log-2-4-worker-6" #{(files 2)}}]
+      (is (= expected (logviewer/get-log-root->files-map files))))))
+
+(deftest test-identify-worker-log-files
+  (testing "Does not include metadata file when there are any log files that
+           should not be cleaned up"
+    (let [cutoff-millis 2000
+          old-logFile (mk-mock-File {:name "mock-1-1-worker-1.log.1"
+                                     :type :file
+                                     :mtime (- cutoff-millis 1000)})
+          mock-metaFile (mk-mock-File {:name "mock-1-1-worker-1.yaml"
+                                       :type :file
+                                       :mtime 1})
+          new-logFile (mk-mock-File {:name "mock-1-1-worker-1.log"
+                                     :type :file
+                                     :mtime (+ cutoff-millis 1000)})
+          exp-id "id12345"
+          exp-user "alice"
+          expected {exp-id {:owner exp-user
+                            :files #{old-logFile}}}]
+      (stubbing [supervisor/read-worker-heartbeats nil
+                logviewer/get-metadata-file-for-log-root-name mock-metaFile
+                read-dir-contents [(.getName old-logFile) (.getName new-logFile)]
+                logviewer/get-worker-id-from-metadata-file exp-id
+                logviewer/get-topo-owner-from-metadata-file exp-user]
+        (is (= expected (logviewer/identify-worker-log-files [old-logFile] "/tmp/")))))))
+
+(deftest test-get-dead-worker-files-and-owners
+  (testing "removes any files of workers that are still alive"
+    (let [conf {SUPERVISOR-WORKER-TIMEOUT-SECS 5}
+          id->hb {"42" {:time-secs 1}}
+          now-secs 2
+          log-files #{:expected-file :unexpected-file}
+          exp-owner "alice"]
+      (stubbing [logviewer/identify-worker-log-files {"42" {:owner exp-owner
+                                                            :files #{:unexpected-file}}
+                                                      "007" {:owner exp-owner
+                                                             :files #{:expected-file}}}
+                 logviewer/get-topo-owner-from-metadata-file "alice"
+                 supervisor/read-worker-heartbeats id->hb]
+        (is (= [{:owner exp-owner :files #{:expected-file}}]
+               (logviewer/get-dead-worker-files-and-owners conf now-secs log-files "/tmp/")))))))
+
+(deftest test-cleanup-fn
+  (testing "cleanup function removes file as user when one is specified"
+    (let [exp-user "mock-user"
+          mockfile1 (mk-mock-File {:name "file1" :type :file})
+          mockfile2 (mk-mock-File {:name "file2" :type :file})
+          mockfile3 (mk-mock-File {:name "file3" :type :file})
+          mockyaml  (mk-mock-File {:name "foo.yaml" :type :file})
+          exp-cmd (str "rmr /mock/canonical/path/to/" (.getName mockfile3))]
+      (stubbing [logviewer/select-files-for-cleanup
+                   [(mk-mock-File {:name "throwaway" :type :file})]
+                 logviewer/get-dead-worker-files-and-owners
+                   [{:owner nil :files #{mockfile1}}
+                    {:files #{mockfile2}}
+                    {:owner exp-user :files #{mockfile3 mockyaml}}]
+                 supervisor/worker-launcher nil
+                 rmr nil]
+        (logviewer/cleanup-fn! "/tmp/")
+        (verify-call-times-for supervisor/worker-launcher 1)
+        (verify-first-call-args-for-indices supervisor/worker-launcher
+                                            [1 2] exp-user exp-cmd)
+        (verify-call-times-for rmr 3)
+        (verify-nth-call-args-for 1 rmr (.getCanonicalPath mockfile1))
+        (verify-nth-call-args-for 2 rmr (.getCanonicalPath mockfile2))
+        (verify-nth-call-args-for 3 rmr (.getCanonicalPath mockyaml))))))
+
+(deftest test-authorized-log-user
+  (testing "allow cluster admin"
+    (let [conf {NIMBUS-ADMINS ["alice"]}]
+      (stubbing [logviewer/get-log-user-whitelist []]
+        (is (logviewer/authorized-log-user? "alice" "non-blank-fname" conf)))))
+
+  (testing "ignore any cluster-set topology.users"
+    (let [conf {TOPOLOGY-USERS ["alice"]}]
+      (stubbing [logviewer/get-log-user-whitelist []]
+        (is (not (logviewer/authorized-log-user? "alice" "non-blank-fname" conf))))))
+
+  (testing "allow cluster logs user"
+    (let [conf {LOGS-USERS ["alice"]}]
+      (stubbing [logviewer/get-log-user-whitelist []]
+        (is (logviewer/authorized-log-user? "alice" "non-blank-fname" conf)))))
+
+  (testing "allow whitelisted topology user"
+    (stubbing [logviewer/get-log-user-whitelist ["alice"]]
+      (is (logviewer/authorized-log-user? "alice" "non-blank-fname" {}))))
+
+  (testing "disallow user not in nimbus admin, topo user, logs user, or whitelist"
+    (stubbing [logviewer/get-log-user-whitelist []]
+      (is (not (logviewer/authorized-log-user? "alice" "non-blank-fname" {}))))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
index 31e69e8..8534c82 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
@@ -30,8 +30,7 @@
                                                     STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
                                                     STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
                                                     STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
-                                                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
-                                                    }]
+                                                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1}]
     (let [topology (thrift/mk-topology
                      {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
                      {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/nimbus_test.clj b/storm-core/test/clj/backtype/storm/nimbus_test.clj
index 1e6554a..c902119 100644
--- a/storm-core/test/clj/backtype/storm/nimbus_test.clj
+++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj
@@ -15,12 +15,15 @@
 ;; limitations under the License.
 (ns backtype.storm.nimbus-test
   (:use [clojure test])
+  (:require [backtype.storm [util :as util]])
   (:require [backtype.storm.daemon [nimbus :as nimbus]])
-  
   (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
   (:import [backtype.storm.scheduler INimbus])
+  (:import [backtype.storm.generated Credentials])
   (:use [backtype.storm bootstrap testing])
   (:use [backtype.storm.daemon common])
+  (:require [conjure.core])
+  (:use [conjure core])
   )
 
 (bootstrap)
@@ -159,6 +162,8 @@
       (is (not-nil? ((:executor->start-time-secs assignment) e))))
     ))
 
+ 	
+
 (deftest test-bogusId
   (with-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3 :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0}]
     (let [state (:storm-cluster-state cluster)
@@ -167,6 +172,7 @@
        (is (thrown? NotAliveException (.getTopology nimbus "bogus-id")))
        (is (thrown? NotAliveException (.getUserTopology nimbus "bogus-id")))
        (is (thrown? NotAliveException (.getTopologyInfo nimbus "bogus-id")))
+       (is (thrown? NotAliveException (.uploadNewCredentials nimbus "bogus-id" (Credentials.))))
       )))
 
 (deftest test-assignment
@@ -724,10 +730,77 @@
       
       )))
 
+
+(defn check-for-collisions [state]
+ (log-message "Checking for collision")
+ (let [assignments (.assignments state nil)]
+   (log-message "Assignemts: " assignments)
+   (let [id->node->ports (into {} (for [id assignments
+                                                :let [executor->node+port (:executor->node+port (.assignment-info state id nil))
+                                                      node+ports (set (.values executor->node+port))
+                                                      node->ports (apply merge-with (fn [a b] (distinct (concat a b))) (for [[node port] node+ports] {node [port]}))]]
+                                                {id node->ports}))
+         _ (log-message "id->node->ports: " id->node->ports)
+         all-nodes (apply merge-with (fn [a b] 
+                                        (let [ret (concat a b)]
+                                              (log-message "Can we combine " (pr-str a) " and " (pr-str b) " without collisions? " (apply distinct? ret) " => " (pr-str ret)) 
+                                              (is (apply distinct? ret))
+                                              (distinct ret)))
+                          (.values id->node->ports))]
+)))
+
+(deftest test-rebalance-constrained-cluster
+  (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 4
+    :daemon-conf {SUPERVISOR-ENABLE false
+                  NIMBUS-MONITOR-FREQ-SECS 10
+                  TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+                  TOPOLOGY-ACKER-EXECUTORS 0}]
+    (letlocals
+      (bind topology (thrift/mk-topology
+                        {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
+                        {}))
+      (bind topology2 (thrift/mk-topology
+                        {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
+                        {}))
+      (bind topology3 (thrift/mk-topology
+                        {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
+                        {}))
+      (bind state (:storm-cluster-state cluster))
+      (submit-local-topology (:nimbus cluster)
+                             "test"
+                             {TOPOLOGY-WORKERS 3
+                              TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology)
+      (submit-local-topology (:nimbus cluster)
+                             "test2"
+                             {TOPOLOGY-WORKERS 3
+                              TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology2)
+      (submit-local-topology (:nimbus cluster)
+                             "test3"
+                             {TOPOLOGY-WORKERS 3
+                              TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology3)
+
+      (advance-cluster-time cluster 31)
+
+      (check-for-collisions state)
+      (.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.)
+                    (.set_num_workers 4)
+                    (.set_wait_secs 0)
+                    ))
+
+      (advance-cluster-time cluster 11)
+      (check-for-collisions state)
+
+      (advance-cluster-time cluster 30)
+      (check-for-collisions state)
+      )))
+
+
 (deftest test-submit-invalid
   (with-simulated-time-local-cluster [cluster
     :daemon-conf {SUPERVISOR-ENABLE false
-                  TOPOLOGY-ACKER-EXECUTORS 0}]
+                  TOPOLOGY-ACKER-EXECUTORS 0
+                  NIMBUS-EXECUTORS-PER-TOPOLOGY 8
+                  NIMBUS-SLOTS-PER-TOPOLOGY 8}]
     (letlocals
       (bind topology (thrift/mk-topology
                         {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 0 :conf {TOPOLOGY-TASKS 1})}
@@ -747,7 +820,31 @@
                                "test/aaa"
                                {}
                                topology)))
-      )))
+      (bind topology (thrift/mk-topology
+                      {"1" (thrift/mk-spout-spec (TestPlannerSpout. true)
+                                                 :parallelism-hint 16
+                                                 :conf {TOPOLOGY-TASKS 16})}
+                      {}))
+      (bind state (:storm-cluster-state cluster))
+      (is (thrown? InvalidTopologyException
+                   (submit-local-topology (:nimbus cluster)
+                                          "test"
+                                          {TOPOLOGY-WORKERS 3} 
+                                          topology)))
+      (bind topology (thrift/mk-topology
+                      {"1" (thrift/mk-spout-spec (TestPlannerSpout. true)
+                                                 :parallelism-hint 5
+                                                 :conf {TOPOLOGY-TASKS 5})}
+                      {}))
+      (is (thrown? InvalidTopologyException
+                   (submit-local-topology (:nimbus cluster)
+                                          "test"
+                                          {TOPOLOGY-WORKERS 16}
+                                          topology)))
+      (is (nil? (submit-local-topology (:nimbus cluster)
+                                       "test"
+                                       {TOPOLOGY-WORKERS 8}
+                                       topology))))))
 
 (deftest test-cleans-corrupt
   (with-inprocess-zookeeper zk-port
@@ -775,13 +872,13 @@
        (.disconnect cluster-state)
        ))))
 
-(deftest test-no-overlapping-slots
-  ;; test that same node+port never appears across 2 assignments
-  )
+;(deftest test-no-overlapping-slots
+;  ;; test that same node+port never appears across 2 assignments
+;  )
 
-(deftest test-stateless
-  ;; test that nimbus can die and restart without any problems
-  )
+;(deftest test-stateless
+;  ;; test that nimbus can die and restart without any problems
+;  )
 
 (deftest test-clean-inbox
   "Tests that the inbox correctly cleans jar files."
@@ -816,11 +913,228 @@
        (assert-files-in-dir [])
        ))))
 
+(deftest test-nimbus-iface-submitTopologyWithOpts-checks-authorization
+  (with-local-cluster [cluster 
+                       :daemon-conf {NIMBUS-AUTHORIZER 
+                          "backtype.storm.security.auth.authorizer.DenyAuthorizer"}]
+    (let [
+          nimbus (:nimbus cluster)
+          topology (thrift/mk-topology {} {})
+         ]
+      (is (thrown? AuthorizationException
+          (submit-local-topology-with-opts nimbus "mystorm" {} topology 
+            (SubmitOptions. TopologyInitialStatus/INACTIVE))
+        ))
+    )
+  )
+)
+
+(deftest test-nimbus-iface-methods-check-authorization
+  (with-local-cluster [cluster 
+                       :daemon-conf {NIMBUS-AUTHORIZER 
+                          "backtype.storm.security.auth.authorizer.DenyAuthorizer"}]
+    (let [
+          nimbus (:nimbus cluster)
+          topology (thrift/mk-topology {} {})
+         ]
+      ; Fake good authorization as part of setup.
+      (mocking [nimbus/check-authorization!]
+          (submit-local-topology-with-opts nimbus "test" {} topology 
+              (SubmitOptions. TopologyInitialStatus/INACTIVE))
+      )
+      (stubbing [nimbus/storm-active? true]
+        (is (thrown? AuthorizationException
+          (.rebalance nimbus "test" (RebalanceOptions.))
+          ))
+      )
+      (is (thrown? AuthorizationException
+        (.activate nimbus "test")
+        ))
+      (is (thrown? AuthorizationException
+        (.deactivate nimbus "test")
+        ))
+    )
+  )
+)
+
+(deftest test-nimbus-check-authorization-params
+  (with-local-cluster [cluster
+                       :daemon-conf {NIMBUS-AUTHORIZER "backtype.storm.security.auth.authorizer.NoopAuthorizer"}]
+    (let [nimbus (:nimbus cluster)
+          topology-name "test-nimbus-check-autho-params"
+          topology (thrift/mk-topology {} {})]
+
+      (submit-local-topology-with-opts nimbus topology-name {} topology
+          (SubmitOptions. TopologyInitialStatus/INACTIVE))
+
+      (let [expected-name topology-name
+            expected-conf {TOPOLOGY-NAME expected-name
+                           :foo :bar}]
+
+        (testing "getTopologyConf calls check-authorization! with the correct parameters."
+          (let [expected-operation "getTopologyConf"]
+            (stubbing [nimbus/check-authorization! nil
+                       nimbus/try-read-storm-conf expected-conf
+                       util/to-json nil]
+              (try
+                (.getTopologyConf nimbus "fake-id")
+                (catch NotAliveException e)
+                (finally
+                  (verify-first-call-args-for-indices
+                    nimbus/check-authorization!
+                      [1 2 3] expected-name expected-conf expected-operation)
+                  (verify-first-call-args-for util/to-json expected-conf))))))
+
+        (testing "getTopology calls check-authorization! with the correct parameters."
+          (let [expected-operation "getTopology"]
+            (stubbing [nimbus/check-authorization! nil
+                       nimbus/try-read-storm-conf expected-conf
+                       nimbus/try-read-storm-topology nil
+                       system-topology! nil]
+              (try
+                (.getTopology nimbus "fake-id")
+                (catch NotAliveException e)
+                (finally
+                  (verify-first-call-args-for-indices
+                    nimbus/check-authorization!
+                      [1 2 3] expected-name expected-conf expected-operation)
+                  (verify-first-call-args-for-indices
+                    system-topology! [0] expected-conf))))))
+
+        (testing "getUserTopology calls check-authorization with the correct parameters."
+          (let [expected-operation "getUserTopology"]
+            (stubbing [nimbus/check-authorization! nil
+                       nimbus/try-read-storm-conf expected-conf
+                       nimbus/try-read-storm-topology nil]
+              (try
+                (.getUserTopology nimbus "fake-id")
+                (catch NotAliveException e)
+                (finally
+                  (verify-first-call-args-for-indices
+                    nimbus/check-authorization!
+                      [1 2 3] expected-name expected-conf expected-operation)
+                  (verify-first-call-args-for-indices
+                    nimbus/try-read-storm-topology [0] expected-conf))))))))))
+
+(deftest test-nimbus-iface-getTopology-methods-throw-correctly
+  (with-local-cluster [cluster]
+    (let [
+          nimbus (:nimbus cluster)
+          id "bogus ID"
+         ]
+      (is (thrown? NotAliveException (.getTopology nimbus id)))
+      (try
+        (.getTopology nimbus id)
+        (catch NotAliveException e
+           (is (= id (.get_msg e)))
+        )
+      )
+
+      (is (thrown? NotAliveException (.getTopologyConf nimbus id)))
+      (try (.getTopologyConf nimbus id)
+        (catch NotAliveException e
+           (is (= id (.get_msg e)))
+        )
+      )
+
+      (is (thrown? NotAliveException (.getTopologyInfo nimbus id)))
+      (try (.getTopologyInfo nimbus id)
+        (catch NotAliveException e
+           (is (= id (.get_msg e)))
+        )
+      )
+
+      (is (thrown? NotAliveException (.getUserTopology nimbus id)))
+      (try (.getUserTopology nimbus id)
+        (catch NotAliveException e
+           (is (= id (.get_msg e)))
+        )
+      )
+    )
+  )
+)
+
+(deftest test-nimbus-iface-getClusterInfo-filters-topos-without-bases
+  (with-local-cluster [cluster]
+    (let [
+          nimbus (:nimbus cluster)
+          bogus-secs 42
+          bogus-type "bogusType"
+          bogus-bases {
+                 "1" nil
+                 "2" {:launch-time-secs bogus-secs
+                        :storm-name "id2-name"
+                        :status {:type bogus-type}}
+                 "3" nil
+                 "4" {:launch-time-secs bogus-secs
+                        :storm-name "id4-name"
+                        :status {:type bogus-type}}
+                }
+        ]
+      (stubbing [topology-bases bogus-bases]
+        (let [topos (.get_topologies (.getClusterInfo nimbus))]
+          ; The number of topologies in the summary is correct.
+          (is (= (count 
+            (filter (fn [b] (second b)) bogus-bases)) (count topos)))
+          ; Each topology present has a valid name.
+          (is (empty?
+            (filter (fn [t] (or (nil? t) (nil? (.get_name t)))) topos)))
+          ; The topologies are those with valid bases.
+          (is (empty?
+            (filter (fn [t] 
+              (or 
+                (nil? t) 
+                (not (number? (read-string (.get_id t))))
+                (odd? (read-string (.get_id t)))
+              )) topos)))
+        )
+      )
+    )
+  )
+)
+
+(deftest test-defserverfn-numbus-iface-instance
+  (test-nimbus-iface-submitTopologyWithOpts-checks-authorization)
+  (test-nimbus-iface-methods-check-authorization)
+  (test-nimbus-iface-getTopology-methods-throw-correctly)
+  (test-nimbus-iface-getClusterInfo-filters-topos-without-bases)
+)
+
+(deftest test-nimbus-data-acls
+  (testing "nimbus-data uses correct ACLs"
+    (let [scheme "digest"
+          digest "storm:thisisapoorpassword"
+          auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme
+                     STORM-ZOOKEEPER-AUTH-PAYLOAD digest}
+          expected-acls nimbus/NIMBUS-ZK-ACLS
+          fake-inimbus (reify INimbus (getForcedScheduler [this] nil))]
+      (stubbing [mk-authorization-handler nil
+                 cluster/mk-storm-cluster-state nil
+                 nimbus/file-cache-map nil
+                 uptime-computer nil
+                 new-instance nil
+                 mk-timer nil
+                 nimbus/mk-scheduler nil]
+        (nimbus/nimbus-data auth-conf fake-inimbus)
+        (verify-call-times-for cluster/mk-storm-cluster-state 1)
+        (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
+                                            expected-acls)))))
+
+(deftest test-file-bogus-download
+  (with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0}]
+    (let [nimbus (:nimbus cluster)]
+      (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus nil)))
+      (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus "")))
+      (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus "/bogus-path/foo")))
+      )))
+
 (deftest test-validate-topo-config-on-submit
   (with-local-cluster [cluster]
     (let [nimbus (:nimbus cluster)
           topology (thrift/mk-topology {} {})
-          bad-config {"topology.workers" "3"}]
-      (is (thrown-cause? InvalidTopologyException
-        (submit-local-topology-with-opts nimbus "test" bad-config topology
-                                         (SubmitOptions.)))))))
+          bad-config {"topology.isolate.machines" "2"}]
+      ; Fake good authorization as part of setup.
+      (mocking [nimbus/check-authorization!]
+        (is (thrown-cause? InvalidTopologyException
+          (submit-local-topology-with-opts nimbus "test" bad-config topology
+                                           (SubmitOptions.))))))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj b/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj
new file mode 100644
index 0000000..4e79240
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj
@@ -0,0 +1,737 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.scheduler.multitenant-scheduler-test
+  (:use [clojure test])
+  (:use [backtype.storm bootstrap config testing])
+  (:require [backtype.storm.daemon [nimbus :as nimbus]])
+  (:import [backtype.storm.generated StormTopology])
+  (:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails
+            SchedulerAssignmentImpl Topologies TopologyDetails])
+  (:import [backtype.storm.scheduler.multitenant Node NodePool FreePool DefaultPool
+            IsolatedPool MultitenantScheduler]))
+
+(bootstrap)
+
+(defn gen-supervisors [count]
+  (into {} (for [id (range count)
+                :let [supervisor (SupervisorDetails. (str "super" id) (str "host" id) (list ) (map int (list 1 2 3 4)))]]
+            {(.getId supervisor) supervisor})))
+
+(defn to-top-map [topologies]
+  (into {} (for [top topologies] {(.getId top) top})))
+
+(defn ed [id] (ExecutorDetails. (int id) (int id)))
+
+(defn mk-ed-map [arg]
+  (into {}
+    (for [[name start end] arg]
+      (into {}
+        (for [at (range start end)]
+          {(ed at) name})))))
+
+(deftest test-node
+  (let [supers (gen-supervisors 5)
+       cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+       node-map (Node/getAllNodesFrom cluster)]
+    (is (= 5 (.size node-map)))
+    (let [node (.get node-map "super0")]
+      (is (= "super0" (.getId node)))
+      (is (= true (.isAlive node)))
+      (is (= 0 (.size (.getRunningTopologies node))))
+      (is (= true (.isTotallyFree node)))
+      (is (= 4 (.totalSlotsFree node)))
+      (is (= 0 (.totalSlotsUsed node)))
+      (is (= 4 (.totalSlots node)))
+      (.assign node "topology1" (list (ExecutorDetails. 1 1)) cluster)
+      (is (= 1 (.size (.getRunningTopologies node))))
+      (is (= false (.isTotallyFree node)))
+      (is (= 3 (.totalSlotsFree node)))
+      (is (= 1 (.totalSlotsUsed node)))
+      (is (= 4 (.totalSlots node)))
+      (.assign node "topology1" (list (ExecutorDetails. 2 2)) cluster)
+      (is (= 1 (.size (.getRunningTopologies node))))
+      (is (= false (.isTotallyFree node)))
+      (is (= 2 (.totalSlotsFree node)))
+      (is (= 2 (.totalSlotsUsed node)))
+      (is (= 4 (.totalSlots node)))
+      (.assign node "topology2" (list (ExecutorDetails. 1 1)) cluster)
+      (is (= 2 (.size (.getRunningTopologies node))))
+      (is (= false (.isTotallyFree node)))
+      (is (= 1 (.totalSlotsFree node)))
+      (is (= 3 (.totalSlotsUsed node)))
+      (is (= 4 (.totalSlots node)))
+      (.assign node "topology2" (list (ExecutorDetails. 2 2)) cluster)
+      (is (= 2 (.size (.getRunningTopologies node))))
+      (is (= false (.isTotallyFree node)))
+      (is (= 0 (.totalSlotsFree node)))
+      (is (= 4 (.totalSlotsUsed node)))
+      (is (= 4 (.totalSlots node)))
+      (.freeAllSlots node cluster)
+      (is (= 0 (.size (.getRunningTopologies node))))
+      (is (= true (.isTotallyFree node)))
+      (is (= 4 (.totalSlotsFree node)))
+      (is (= 0 (.totalSlotsUsed node)))
+      (is (= 4 (.totalSlots node)))
+    )))
+
+(deftest test-free-pool
+  (let [supers (gen-supervisors 5)
+       cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+       node-map (Node/getAllNodesFrom cluster)
+       free-pool (FreePool. )]
+    ;; assign one node so it is not in the pool
+    (.assign (.get node-map "super0") "topology1" (list (ExecutorDetails. 1 1)) cluster)
+    (.init free-pool cluster node-map)
+    (is (= 4 (.nodesAvailable free-pool)))
+    (is (= (* 4 4) (.slotsAvailable free-pool)))
+    (let [ns-count-1 (.getNodeAndSlotCountIfSlotsWereTaken free-pool 1)
+          ns-count-3 (.getNodeAndSlotCountIfSlotsWereTaken free-pool 3)
+          ns-count-4 (.getNodeAndSlotCountIfSlotsWereTaken free-pool 4)
+          ns-count-5 (.getNodeAndSlotCountIfSlotsWereTaken free-pool 5)]
+      (is (= 1 (._nodes ns-count-1)))
+      (is (= 4 (._slots ns-count-1)))
+      (is (= 1 (._nodes ns-count-3)))
+      (is (= 4 (._slots ns-count-3)))
+      (is (= 1 (._nodes ns-count-4)))
+      (is (= 4 (._slots ns-count-4)))
+      (is (= 2 (._nodes ns-count-5)))
+      (is (= 8 (._slots ns-count-5)))
+    )
+    (let [nodes (.takeNodesBySlots free-pool 5)]
+      (is (= 2 (.size nodes)))
+      (is (= 8 (Node/countFreeSlotsAlive nodes)))
+      (is (= 8 (Node/countTotalSlotsAlive nodes)))
+      (is (= 2 (.nodesAvailable free-pool)))
+      (is (= (* 2 4) (.slotsAvailable free-pool)))
+    )
+    (let [nodes (.takeNodes free-pool 3)] ;;Only 2 should be left
+      (is (= 2 (.size nodes)))
+      (is (= 8 (Node/countFreeSlotsAlive nodes)))
+      (is (= 8 (Node/countTotalSlotsAlive nodes)))
+      (is (= 0 (.nodesAvailable free-pool)))
+      (is (= 0 (.slotsAvailable free-pool)))
+    )))
+
+(deftest test-default-pool-simple
+  (let [supers (gen-supervisors 5)
+       cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+       node-map (Node/getAllNodesFrom cluster)
+       free-pool (FreePool. )
+       default-pool (DefaultPool. )
+       executor1 (ed 1)
+       executor2 (ed 2)
+       executor3 (ed 3)
+       topology1 (TopologyDetails. "topology1" 
+                   {TOPOLOGY-NAME "topology-name-1"}
+                   (StormTopology.)
+                   2
+                   {executor1 "spout1"
+                    executor2 "bolt1"
+                    executor3 "bolt2"})]
+    ;; assign one node so it is not in the pool
+    (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
+    (.init free-pool cluster node-map)
+    (.init default-pool cluster node-map)
+    (is (= true (.canAdd default-pool topology1)))
+    (.addTopology default-pool topology1)
+    ;;Only 1 node is in the default-pool because only one nodes was scheduled already
+    (is (= 4 (.slotsAvailable default-pool)))
+    (is (= 1 (.nodesAvailable default-pool)))
+    (is (= (* 4 4) (.slotsAvailable free-pool)))
+    (is (= 4 (.nodesAvailable free-pool)))
+    (is (= 1 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+    (.scheduleAsNeeded default-pool (into-array NodePool [free-pool]))
+    (is (= 4 (.slotsAvailable default-pool)))
+    (is (= 1 (.nodesAvailable default-pool)))
+    (is (= (* 4 4) (.slotsAvailable free-pool)))
+    (is (= 4 (.nodesAvailable free-pool)))
+    (is (= 2 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+    (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
+))
+
+(deftest test-default-pool-big-request
+  (let [supers (gen-supervisors 5)
+       cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+       node-map (Node/getAllNodesFrom cluster)
+       free-pool (FreePool. )
+       default-pool (DefaultPool. )
+       executor1 (ed 1)
+       executor2 (ed 2)
+       executor3 (ed 3)
+       topology1 (TopologyDetails. "topology1" 
+                   {TOPOLOGY-NAME "topology-name-1"}
+                   (StormTopology.)
+                   5
+                   {executor1 "spout1"
+                    executor2 "bolt1"
+                    executor3 "bolt2"})]
+    ;; assign one node so it is not in the pool
+    (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
+    (.init free-pool cluster node-map)
+    (.init default-pool cluster node-map)
+    (is (= true (.canAdd default-pool topology1)))
+    (.addTopology default-pool topology1)
+    ;;Only 1 node is in the default-pool because only one nodes was scheduled already
+    (is (= 4 (.slotsAvailable default-pool)))
+    (is (= 1 (.nodesAvailable default-pool)))
+    (is (= (* 4 4) (.slotsAvailable free-pool)))
+    (is (= 4 (.nodesAvailable free-pool)))
+    (is (= 1 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+    (.scheduleAsNeeded default-pool (into-array NodePool [free-pool]))
+    (is (= 4 (.slotsAvailable default-pool)))
+    (is (= 1 (.nodesAvailable default-pool)))
+    (is (= (* 4 4) (.slotsAvailable free-pool)))
+    (is (= 4 (.nodesAvailable free-pool)))
+    (is (= 3 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+    (is (= "Fully Scheduled (requested 5 slots, but could only use 3)" (.get (.getStatusMap cluster) "topology1")))
+))
+
+(deftest test-default-pool-big-request-2
+  (let [supers (gen-supervisors 1)
+       cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+       node-map (Node/getAllNodesFrom cluster)
+       free-pool (FreePool. )
+       default-pool (DefaultPool. )
+       executor1 (ed 1)
+       executor2 (ed 2)
+       executor3 (ed 3)
+       executor4 (ed 4)
+       executor5 (ed 5)
+       topology1 (TopologyDetails. "topology1" 
+                   {TOPOLOGY-NAME "topology-name-1"}
+                   (StormTopology.)
+                   5
+                   {executor1 "spout1"
+                    executor2 "bolt1"
+                    executor3 "bolt1"
+                    executor4 "bolt1"
+                    executor5 "bolt2"})]
+    ;; assign one node so it is not in the pool
+    (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
+    (.init free-pool cluster node-map)
+    (.init default-pool cluster node-map)
+    (is (= true (.canAdd default-pool topology1)))
+    (.addTopology default-pool topology1)
+    ;;Only 1 node is in the default-pool because only one nodes was scheduled already
+    (is (= 4 (.slotsAvailable default-pool)))
+    (is (= 1 (.nodesAvailable default-pool)))
+    (is (= 0 (.slotsAvailable free-pool)))
+    (is (= 0 (.nodesAvailable free-pool)))
+    (is (= 1 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+    (.scheduleAsNeeded default-pool (into-array NodePool [free-pool]))
+    (is (= 4 (.slotsAvailable default-pool)))
+    (is (= 1 (.nodesAvailable default-pool)))
+    (is (= 0 (.slotsAvailable free-pool)))
+    (is (= 0 (.nodesAvailable free-pool)))
+    (is (= 4 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+    (is (= "Running with fewer slots than requested (4/5)" (.get (.getStatusMap cluster) "topology1")))
+))
+
+(deftest test-default-pool-full
+  (let [supers (gen-supervisors 2) ;;make 2 supervisors but only schedule with one of them
+       single-super {(ffirst supers) (second (first supers))}
+       single-cluster (Cluster. (nimbus/standalone-nimbus) single-super {})
+       executor1 (ed 1)
+       executor2 (ed 2)
+       executor3 (ed 3)
+       executor4 (ed 4)
+       executor5 (ed 5)
+       topology1 (TopologyDetails. "topology1" 
+                   {TOPOLOGY-NAME "topology-name-1"}
+                   (StormTopology.)
+                   5
+                   {executor1 "spout1"
+                    executor2 "bolt1"
+                    executor3 "bolt2"
+                    executor4 "bolt3"
+                    executor5 "bolt4"})]
+    (let [node-map (Node/getAllNodesFrom single-cluster)
+         free-pool (FreePool. )
+         default-pool (DefaultPool. )]
+      (.init free-pool single-cluster node-map)
+      (.init default-pool single-cluster node-map)
+      (.addTopology default-pool topology1)
+      (.scheduleAsNeeded default-pool (into-array NodePool [free-pool]))
+      ;; The cluster should be full and have 4 slots used, but the topology would like 1 more
+      (is (= 4 (.size (.getUsedSlots single-cluster))))
+      (is (= "Running with fewer slots than requested (4/5)" (.get (.getStatusMap single-cluster) "topology1")))
+    )
+
+    (let [cluster (Cluster. (nimbus/standalone-nimbus) supers (.getAssignments single-cluster))
+         node-map (Node/getAllNodesFrom cluster)
+         free-pool (FreePool. )
+         default-pool (DefaultPool. )]
+      (.init free-pool cluster node-map)
+      (.init default-pool cluster node-map)
+      (.addTopology default-pool topology1)
+      (.scheduleAsNeeded default-pool (into-array NodePool [free-pool]))
+      ;; The cluster should now have 5 slots used
+      (is (= 5 (.size (.getUsedSlots cluster))))
+      (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
+    )
+))
+
+
+(deftest test-default-pool-complex
+  (let [supers (gen-supervisors 5)
+       cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+       node-map (Node/getAllNodesFrom cluster)
+       free-pool (FreePool. )
+       default-pool (DefaultPool. )
+       executor1 (ed 1)
+       executor2 (ed 2)
+       executor3 (ed 3)
+       executor11 (ed 11)
+       executor12 (ed 12)
+       executor13 (ed 13)
+       executor14 (ed 14)
+       topology1 (TopologyDetails. "topology1" 
+                   {TOPOLOGY-NAME "topology-name-1"}
+                   (StormTopology.)
+                   2
+                   {executor1 "spout1"
+                    executor2 "bolt1"
+                    executor3 "bolt2"})
+       topology2 (TopologyDetails. "topology2"
+                    {TOPOLOGY-NAME "topology-name-2"}
+                    (StormTopology.)
+                    4
+                    {executor11 "spout11"
+                     executor12 "bolt12"
+                     executor13 "bolt13"
+                     executor14 "bolt14"})]
+    ;; assign one node so it is not in the pool
+    (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
+    (.init free-pool cluster node-map)
+    (.init default-pool cluster node-map)
+    (is (= true (.canAdd default-pool topology1)))
+    (.addTopology default-pool topology1)
+    (is (= true (.canAdd default-pool topology2)))
+    (.addTopology default-pool topology2)
+    ;;Only 1 node is in the default-pool because only one nodes was scheduled already
+    (is (= 4 (.slotsAvailable default-pool)))
+    (is (= 1 (.nodesAvailable default-pool)))
+    (is (= (* 4 4) (.slotsAvailable free-pool)))
+    (is (= 4 (.nodesAvailable free-pool)))
+    (is (= 1 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+    (is (= nil (.getAssignmentById cluster "topology2")))
+    (.scheduleAsNeeded default-pool (into-array NodePool [free-pool]))
+    ;;We steal a node from the free pool to handle the extra
+    (is (= 8 (.slotsAvailable default-pool)))
+    (is (= 2 (.nodesAvailable default-pool)))
+    (is (= (* 3 4) (.slotsAvailable free-pool)))
+    (is (= 3 (.nodesAvailable free-pool)))
+    (is (= 2 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+    (is (= 4 (.size (.getSlots (.getAssignmentById cluster "topology2")))))
+    (let [ns-count-1 (.getNodeAndSlotCountIfSlotsWereTaken default-pool 1)
+          ns-count-3 (.getNodeAndSlotCountIfSlotsWereTaken default-pool 3)
+          ns-count-4 (.getNodeAndSlotCountIfSlotsWereTaken default-pool 4)
+          ns-count-5 (.getNodeAndSlotCountIfSlotsWereTaken default-pool 5)]
+      (is (= 1 (._nodes ns-count-1)))
+      (is (= 4 (._slots ns-count-1)))
+      (is (= 1 (._nodes ns-count-3)))
+      (is (= 4 (._slots ns-count-3)))
+      (is (= 1 (._nodes ns-count-4)))
+      (is (= 4 (._slots ns-count-4)))
+      (is (= 2 (._nodes ns-count-5)))
+      (is (= 8 (._slots ns-count-5)))
+    )
+    (let [nodes (.takeNodesBySlots default-pool 3)]
+      (is (= 1 (.size nodes)))
+      (is (= 4 (Node/countFreeSlotsAlive nodes)))
+      (is (= 4 (Node/countTotalSlotsAlive nodes)))
+      (is (= 1 (.nodesAvailable default-pool)))
+      (is (= (* 1 4) (.slotsAvailable default-pool)))
+    )
+    (let [nodes (.takeNodes default-pool 3)] ;;Only 1 should be left
+      (is (= 1 (.size nodes)))
+      (is (= 4 (Node/countFreeSlotsAlive nodes)))
+      (is (= 4 (Node/countTotalSlotsAlive nodes)))
+      (is (= 0 (.nodesAvailable default-pool)))
+      (is (= 0 (.slotsAvailable default-pool)))
+    )
+    (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
+    (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")))
+))
+
+(deftest test-isolated-pool-simple
+  (let [supers (gen-supervisors 5)
+       cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+       node-map (Node/getAllNodesFrom cluster)
+       free-pool (FreePool. )
+       isolated-pool (IsolatedPool. 5)
+       executor1 (ed 1)
+       executor2 (ed 2)
+       executor3 (ed 3)
+       executor4 (ed 4)
+       topology1 (TopologyDetails. "topology1" 
+                   {TOPOLOGY-NAME "topology-name-1"
+                    TOPOLOGY-ISOLATED-MACHINES 4}
+                   (StormTopology.)
+                   4
+                   {executor1 "spout1"
+                    executor2 "bolt1"
+                    executor3 "bolt2"
+                    executor4 "bolt4"})]
+    ;; assign one node so it is not in the pool
+    (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
+    (.init free-pool cluster node-map)
+    (.init isolated-pool cluster node-map)
+    (is (= true (.canAdd isolated-pool topology1)))
+    (.addTopology isolated-pool topology1)
+    ;;Isolated topologies cannot have their resources stolen
+    (is (= 0 (.slotsAvailable isolated-pool)))
+    (is (= 0 (.nodesAvailable isolated-pool)))
+    (is (= (* 4 4) (.slotsAvailable free-pool)))
+    (is (= 4 (.nodesAvailable free-pool)))
+    (is (= 1 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+    (.scheduleAsNeeded isolated-pool (into-array NodePool [free-pool]))
+    (is (= 0 (.slotsAvailable isolated-pool)))
+    (is (= 0 (.nodesAvailable isolated-pool)))
+    (is (= (* 1 4) (.slotsAvailable free-pool)))
+    (is (= 1 (.nodesAvailable free-pool)))
+    (let [assigned-slots (.getSlots (.getAssignmentById cluster "topology1"))]
+      ;; 4 slots on 4 machines
+      (is (= 4 (.size assigned-slots)))
+      (is (= 4 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+    )
+    (is (= "Scheduled Isolated on 4 Nodes" (.get (.getStatusMap cluster) "topology1")))
+))
+
+(deftest test-isolated-pool-big-ask
+  (let [supers (gen-supervisors 5)
+       cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+       node-map (Node/getAllNodesFrom cluster)
+       free-pool (FreePool. )
+       isolated-pool (IsolatedPool. 5)
+       executor1 (ed 1)
+       executor2 (ed 2)
+       executor3 (ed 3)
+       executor4 (ed 4)
+       topology1 (TopologyDetails. "topology1" 
+                   {TOPOLOGY-NAME "topology-name-1"
+                    TOPOLOGY-ISOLATED-MACHINES 4}
+                   (StormTopology.)
+                   10
+                   {executor1 "spout1"
+                    executor2 "bolt1"
+                    executor3 "bolt2"
+                    executor4 "bolt4"})]
+    ;; assign one node so it is not in the pool
+    (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
+    (.init free-pool cluster node-map)
+    (.init isolated-pool cluster node-map)
+    (is (= true (.canAdd isolated-pool topology1)))
+    (.addTopology isolated-pool topology1)
+    ;;Isolated topologies cannot have their resources stolen
+    (is (= 0 (.slotsAvailable isolated-pool)))
+    (is (= 0 (.nodesAvailable isolated-pool)))
+    (is (= (* 4 4) (.slotsAvailable free-pool)))
+    (is (= 4 (.nodesAvailable free-pool)))
+    (is (= 1 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+    (.scheduleAsNeeded isolated-pool (into-array NodePool [free-pool]))
+    (is (= 0 (.slotsAvailable isolated-pool)))
+    (is (= 0 (.nodesAvailable isolated-pool)))
+    (is (= (* 1 4) (.slotsAvailable free-pool)))
+    (is (= 1 (.nodesAvailable free-pool)))
+    (let [assigned-slots (.getSlots (.getAssignmentById cluster "topology1"))]
+      ;; 4 slots on 4 machines
+      (is (= 4 (.size assigned-slots)))
+      (is (= 4 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+    )
+    (is (= "Scheduled Isolated on 4 Nodes" (.get (.getStatusMap cluster) "topology1")))
+))
+
+(deftest test-isolated-pool-complex
+  (let [supers (gen-supervisors 5)
+       cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+       node-map (Node/getAllNodesFrom cluster)
+       free-pool (FreePool. )
+       isolated-pool (IsolatedPool. 5)
+       executor1 (ed 1)
+       executor2 (ed 2)
+       executor3 (ed 3)
+       executor4 (ed 4)
+       executor11 (ed 11)
+       executor12 (ed 12)
+       executor13 (ed 13)
+       executor14 (ed 14)
+       topology1 (TopologyDetails. "topology1" 
+                   {TOPOLOGY-NAME "topology-name-1"}
+                   (StormTopology.)
+                   4
+                   {executor1 "spout1"
+                    executor2 "bolt1"
+                    executor3 "bolt2"
+                    executor4 "bolt4"})
+       topology2 (TopologyDetails. "topology2"
+                    {TOPOLOGY-NAME "topology-name-2"
+                     TOPOLOGY-ISOLATED-MACHINES 2}
+                    (StormTopology.)
+                    4
+                    {executor11 "spout11"
+                     executor12 "bolt12"
+                     executor13 "bolt13"
+                     executor14 "bolt14"})]
+    ;; assign one node so it is not in the pool
+    (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
+    (.init free-pool cluster node-map)
+    (.init isolated-pool cluster node-map)
+    (is (= true (.canAdd isolated-pool topology1)))
+    (.addTopology isolated-pool topology1)
+    (is (= true (.canAdd isolated-pool topology2)))
+    (.addTopology isolated-pool topology2)
+    ;; nodes can be stolen from non-isolted tops in the pool
+    (is (= 4 (.slotsAvailable isolated-pool)))
+    (is (= 1 (.nodesAvailable isolated-pool)))
+    (is (= (* 4 4) (.slotsAvailable free-pool)))
+    (is (= 4 (.nodesAvailable free-pool)))
+    (is (= 1 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+    (is (= nil (.getAssignmentById cluster "topology2")))
+    (.scheduleAsNeeded isolated-pool (into-array NodePool [free-pool]))
+    ;;We steal 2 nodes from the free pool to handle the extra (but still only 1 node for the non-isolated top
+    (is (= 4 (.slotsAvailable isolated-pool)))
+    (is (= 1 (.nodesAvailable isolated-pool)))
+    (is (= (* 2 4) (.slotsAvailable free-pool)))
+    (is (= 2 (.nodesAvailable free-pool)))
+    (let [assigned-slots (.getSlots (.getAssignmentById cluster "topology1"))]
+      ;; 4 slots on 1 machine
+      (is (= 4 (.size assigned-slots)))
+      (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+    )
+    (let [assigned-slots (.getSlots (.getAssignmentById cluster "topology2"))]
+      ;; 4 slots on 2 machines
+      (is (= 4 (.size assigned-slots)))
+      (is (= 2 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+    )
+
+    (let [ns-count-1 (.getNodeAndSlotCountIfSlotsWereTaken isolated-pool 1)
+          ns-count-3 (.getNodeAndSlotCountIfSlotsWereTaken isolated-pool 3)
+          ns-count-4 (.getNodeAndSlotCountIfSlotsWereTaken isolated-pool 4)
+          ns-count-5 (.getNodeAndSlotCountIfSlotsWereTaken isolated-pool 5)]
+      (is (= 1 (._nodes ns-count-1)))
+      (is (= 4 (._slots ns-count-1)))
+      (is (= 1 (._nodes ns-count-3)))
+      (is (= 4 (._slots ns-count-3)))
+      (is (= 1 (._nodes ns-count-4)))
+      (is (= 4 (._slots ns-count-4)))
+      (is (= 1 (._nodes ns-count-5))) ;;Only 1 node can be stolen right now
+      (is (= 4 (._slots ns-count-5)))
+    )
+    (let [nodes (.takeNodesBySlots isolated-pool 3)]
+      (is (= 1 (.size nodes)))
+      (is (= 4 (Node/countFreeSlotsAlive nodes)))
+      (is (= 4 (Node/countTotalSlotsAlive nodes)))
+      (is (= 0 (.nodesAvailable isolated-pool)))
+      (is (= (* 0 4) (.slotsAvailable isolated-pool)))
+    )
+    (let [assigned-slots (.getSlots (.getAssignmentById cluster "topology1"))]
+      ;; 4 slots on 1 machine
+      (is (= 0 (.size assigned-slots)))
+      (is (= 0 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+    )
+    (let [assigned-slots (.getSlots (.getAssignmentById cluster "topology2"))]
+      ;; 4 slots on 2 machines
+      (is (= 4 (.size assigned-slots)))
+      (is (= 2 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+    )
+    (let [nodes (.takeNodes isolated-pool 3)] ;;Cannot steal from the isolated scheduler
+      (is (= 0 (.size nodes)))
+      (is (= 0 (Node/countFreeSlotsAlive nodes)))
+      (is (= 0 (Node/countTotalSlotsAlive nodes)))
+      (is (= 0 (.nodesAvailable isolated-pool)))
+      (is (= 0 (.slotsAvailable isolated-pool)))
+    )
+    (is (= "Scheduled Isolated on 1 Nodes" (.get (.getStatusMap cluster) "topology1")))
+    (is (= "Scheduled Isolated on 2 Nodes" (.get (.getStatusMap cluster) "topology2")))
+))
+
+(deftest test-isolated-pool-complex-2
+  (let [supers (gen-supervisors 5)
+       cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+       node-map (Node/getAllNodesFrom cluster)
+       free-pool (FreePool. )
+       ;;like before but now we can only hold 2 nodes max.  Don't go over
+       isolated-pool (IsolatedPool. 2)
+       executor1 (ed 1)
+       executor2 (ed 2)
+       executor3 (ed 3)
+       executor4 (ed 4)
+       executor11 (ed 11)
+       executor12 (ed 12)
+       executor13 (ed 13)
+       executor14 (ed 14)
+       topology1 (TopologyDetails. "topology1" 
+                   {TOPOLOGY-NAME "topology-name-1"}
+                   (StormTopology.)
+                   4
+                   {executor1 "spout1"
+                    executor2 "bolt1"
+                    executor3 "bolt2"
+                    executor4 "bolt4"})
+       topology2 (TopologyDetails. "topology2"
+                    {TOPOLOGY-NAME "topology-name-2"
+                     TOPOLOGY-ISOLATED-MACHINES 2}
+                    (StormTopology.)
+                    4
+                    {executor11 "spout11"
+                     executor12 "bolt12"
+                     executor13 "bolt13"
+                     executor14 "bolt14"})]
+    ;; assign one node so it is not in the pool
+    (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
+    (.init free-pool cluster node-map)
+    (.init isolated-pool cluster node-map)
+    (is (= true (.canAdd isolated-pool topology1)))
+    (.addTopology isolated-pool topology1)
+    (is (= true (.canAdd isolated-pool topology2)))
+    (.addTopology isolated-pool topology2)
+    ;; nodes can be stolen from non-isolted tops in the pool
+    (is (= 4 (.slotsAvailable isolated-pool)))
+    (is (= 1 (.nodesAvailable isolated-pool)))
+    (is (= (* 4 4) (.slotsAvailable free-pool)))
+    (is (= 4 (.nodesAvailable free-pool)))
+    (is (= 1 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+    (is (= nil (.getAssignmentById cluster "topology2")))
+    (.scheduleAsNeeded isolated-pool (into-array NodePool [free-pool]))
+    ;;We steal 1 node from the free pool and 1 from ourself to handle the extra
+    (is (= 0 (.slotsAvailable isolated-pool)))
+    (is (= 0 (.nodesAvailable isolated-pool)))
+    (is (= (* 3 4) (.slotsAvailable free-pool)))
+    (is (= 3 (.nodesAvailable free-pool)))
+    (let [assigned-slots (.getSlots (.getAssignmentById cluster "topology1"))]
+      ;; 0 slots on 0 machine
+      (is (= 0 (.size assigned-slots)))
+      (is (= 0 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+    )
+    (let [assigned-slots (.getSlots (.getAssignmentById cluster "topology2"))]
+      ;; 4 slots on 2 machines
+      (is (= 4 (.size assigned-slots)))
+      (is (= 2 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+    )
+    (is (= "Max Nodes(2) for this user would be exceeded. 1 more nodes needed to run topology." (.get (.getStatusMap cluster) "topology1")))
+    (is (= "Scheduled Isolated on 2 Nodes" (.get (.getStatusMap cluster) "topology2")))
+))
+
+(deftest test-multitenant-scheduler
+  (let [supers (gen-supervisors 10)
+       topology1 (TopologyDetails. "topology1" 
+                   {TOPOLOGY-NAME "topology-name-1"
+                    TOPOLOGY-SUBMITTER-USER "userC"}
+                   (StormTopology.)
+                   4
+                   (mk-ed-map [["spout1" 0 5]
+                               ["bolt1" 5 10]
+                               ["bolt2" 10 15]
+                               ["bolt3" 15 20]]))
+       topology2 (TopologyDetails. "topology2"
+                    {TOPOLOGY-NAME "topology-name-2"
+                     TOPOLOGY-ISOLATED-MACHINES 2
+                     TOPOLOGY-SUBMITTER-USER "userA"}
+                    (StormTopology.)
+                    4
+                    (mk-ed-map [["spout11" 0 5]
+                                ["bolt12" 5 6]
+                                ["bolt13" 6 7]
+                                ["bolt14" 7 10]]))
+       topology3 (TopologyDetails. "topology3"
+                    {TOPOLOGY-NAME "topology-name-3"
+                     TOPOLOGY-ISOLATED-MACHINES 5
+                     TOPOLOGY-SUBMITTER-USER "userB"}
+                    (StormTopology.)
+                    10
+                    (mk-ed-map [["spout21" 0 10]
+                                ["bolt22" 10 20]
+                                ["bolt23" 20 30]
+                                ["bolt24" 30 40]]))
+       cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+       node-map (Node/getAllNodesFrom cluster)
+       topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
+       conf {MULTITENANT-SCHEDULER-USER-POOLS {"userA" 5 "userB" 5}}
+       scheduler (MultitenantScheduler.)]
+    (.assign (.get node-map "super0") "topology1" (list (ed 1)) cluster)
+    (.assign (.get node-map "super1") "topology2" (list (ed 5)) cluster)
+    (.prepare scheduler conf)
+    (.schedule scheduler topologies cluster)
+    (let [assignment (.getAssignmentById cluster "topology1")
+          assigned-slots (.getSlots assignment)
+          executors (.getExecutors assignment)]
+      ;; 4 slots on 1 machine, all executors assigned
+      (is (= 4 (.size assigned-slots)))
+      (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+      (is (= 20 (.size executors)))
+    )
+    (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
+    (is (= "Scheduled Isolated on 2 Nodes" (.get (.getStatusMap cluster) "topology2")))
+    (is (= "Scheduled Isolated on 5 Nodes" (.get (.getStatusMap cluster) "topology3")))
+))
+
+
+(deftest test-multitenant-scheduler-bad-starting-state
+  (let [supers (gen-supervisors 10)
+       topology1 (TopologyDetails. "topology1" 
+                   {TOPOLOGY-NAME "topology-name-1"
+                    TOPOLOGY-SUBMITTER-USER "userC"}
+                   (StormTopology.)
+                   4
+                   (mk-ed-map [["spout1" 0 5]
+                               ["bolt1" 5 10]
+                               ["bolt2" 10 15]
+                               ["bolt3" 15 20]]))
+       topology2 (TopologyDetails. "topology2"
+                    {TOPOLOGY-NAME "topology-name-2"
+                     TOPOLOGY-ISOLATED-MACHINES 2
+                     TOPOLOGY-SUBMITTER-USER "userA"}
+                    (StormTopology.)
+                    4
+                    (mk-ed-map [["spout11" 0 5]
+                                ["bolt12" 5 6]
+                                ["bolt13" 6 7]
+                                ["bolt14" 7 10]]))
+       topology3 (TopologyDetails. "topology3"
+                    {TOPOLOGY-NAME "topology-name-3"
+                     TOPOLOGY-ISOLATED-MACHINES 5
+                     TOPOLOGY-SUBMITTER-USER "userB"}
+                    (StormTopology.)
+                    10
+                    (mk-ed-map [["spout21" 0 10]
+                                ["bolt22" 10 20]
+                                ["bolt23" 20 30]
+                                ["bolt24" 30 40]]))
+       existing-assignments {
+         "topology2" (SchedulerAssignmentImpl. "topology2" {(ExecutorDetails. 0 5) (WorkerSlot. "super1" 1)})
+         "topology3" (SchedulerAssignmentImpl. "topology3" {(ExecutorDetails. 0 10) (WorkerSlot. "super1" 1)})
+       }
+       cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments)
+       topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
+       conf {MULTITENANT-SCHEDULER-USER-POOLS {"userA" 5 "userB" 5}}
+       scheduler (MultitenantScheduler.)]
+    (.prepare scheduler conf)
+    (.schedule scheduler topologies cluster)
+    (let [assignment (.getAssignmentById cluster "topology1")
+          assigned-slots (.getSlots assignment)
+          executors (.getExecutors assignment)]
+      ;; 4 slots on 1 machine, all executors assigned
+      (is (= 4 (.size assigned-slots)))
+      (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+      (is (= 20 (.size executors)))
+    )
+    (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
+    (is (= "Scheduled Isolated on 2 Nodes" (.get (.getStatusMap cluster) "topology2")))
+    (is (= "Scheduled Isolated on 5 Nodes" (.get (.getStatusMap cluster) "topology3")))
+))
+
+

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/AuthUtils_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/AuthUtils_test.clj b/storm-core/test/clj/backtype/storm/security/auth/AuthUtils_test.clj
index ed21904..2d96b18 100644
--- a/storm-core/test/clj/backtype/storm/security/auth/AuthUtils_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/AuthUtils_test.clj
@@ -14,13 +14,16 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns backtype.storm.security.auth.AuthUtils-test
-  (:import [backtype.storm.security.auth AuthUtils])
+  (:import [backtype.storm.security.auth AuthUtils IAutoCredentials])
   (:import [java.io IOException])
   (:import [javax.security.auth.login AppConfigurationEntry Configuration])
   (:import [org.mockito Mockito])
   (:use [clojure test])
+  (:use [backtype.storm bootstrap])
 )
 
+(bootstrap)
+
 (deftest test-throws-on-missing-section
   (is (thrown? IOException
     (AuthUtils/get (Mockito/mock Configuration) "bogus-section" "")))
@@ -61,5 +64,16 @@
       (is (not (nil? (AuthUtils/get conf section k))))
       (is (= (AuthUtils/get conf section k) expected))
     )
+  ))
+
+(deftest test-empty-auto-creds
+  (let [result (AuthUtils/GetAutoCredentials {})]
+    (is (.isEmpty result))
+  )
+)
+
+(deftest test-empty-creds-renewers
+  (let [result (AuthUtils/GetCredentialRenewers {})]
+    (is (.isEmpty result))
   )
 )

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/DefaultHttpCredentialsPlugin_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/DefaultHttpCredentialsPlugin_test.clj b/storm-core/test/clj/backtype/storm/security/auth/DefaultHttpCredentialsPlugin_test.clj
new file mode 100644
index 0000000..ab54d82
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/DefaultHttpCredentialsPlugin_test.clj
@@ -0,0 +1,40 @@
+(ns backtype.storm.security.auth.DefaultHttpCredentialsPlugin-test
+  (:use [clojure test])
+  (:import [javax.security.auth Subject])
+  (:import [javax.servlet.http HttpServletRequest])
+  (:import [org.mockito Mockito])
+  (:import [backtype.storm.security.auth DefaultHttpCredentialsPlugin
+            ReqContext SingleUserPrincipal])
+  )
+
+(deftest test-getUserName
+  (let [handler (doto (DefaultHttpCredentialsPlugin.) (.prepare {}))]
+    (testing "returns null when request is null"
+      (is (nil? (.getUserName handler nil))))
+
+    (testing "returns null when user principal is null"
+      (let [req (Mockito/mock HttpServletRequest)]
+        (is (nil? (.getUserName handler req)))))
+
+    (testing "returns null when user is blank"
+      (let [princ (SingleUserPrincipal. "")
+            req (Mockito/mock HttpServletRequest)]
+        (. (Mockito/when (. req getUserPrincipal))
+           thenReturn princ)
+        (is (nil? (.getUserName handler req)))))
+
+    (testing "returns correct user from requests principal"
+      (let [exp-name "Alice"
+            princ (SingleUserPrincipal. exp-name)
+            req (Mockito/mock HttpServletRequest)]
+        (. (Mockito/when (. req getUserPrincipal))
+           thenReturn princ)
+        (is (.equals exp-name (.getUserName handler req)))))))
+
+(deftest test-populate-req-context-noop-on-null-user
+  (let [req (Mockito/mock HttpServletRequest)
+        handler (doto (DefaultHttpCredentialsPlugin.) (.prepare {}))
+        expected-subj (Subject.)
+        context (ReqContext. expected-subj)]
+    (is (.equals expected-subj
+                 (-> handler (.populateContext context req) (.subject))))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/ThriftClient_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/ThriftClient_test.clj b/storm-core/test/clj/backtype/storm/security/auth/ThriftClient_test.clj
index 7dcd86d..2eee963 100644
--- a/storm-core/test/clj/backtype/storm/security/auth/ThriftClient_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/ThriftClient_test.clj
@@ -14,28 +14,32 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns backtype.storm.security.auth.ThriftClient-test
-  (:use [backtype.storm config])
+  (:use [backtype.storm config util])
   (:use [clojure test])
-  (:import [backtype.storm.security.auth ThriftClient])
+  (:import [backtype.storm.security.auth ThriftClient ThriftConnectionType])
   (:import [org.apache.thrift.transport TTransportException])
 )
 
 (deftest test-ctor-throws-if-port-invalid
-  (let [conf (read-default-config)
+  (let [conf (merge
+              (read-default-config)
+              {STORM-NIMBUS-RETRY-TIMES 0})
         timeout (Integer. 30)]
-    (is (thrown? java.lang.IllegalArgumentException
-      (ThriftClient. conf "bogushost" -1 timeout)))
-    (is (thrown? java.lang.IllegalArgumentException
-        (ThriftClient. conf "bogushost" 0 timeout)))
+    (is (thrown-cause? java.lang.IllegalArgumentException
+      (ThriftClient. conf ThriftConnectionType/DRPC "bogushost" (int -1) timeout)))
+    (is (thrown-cause? java.lang.IllegalArgumentException
+        (ThriftClient. conf ThriftConnectionType/DRPC "bogushost" (int 0) timeout)))
   )
 )
 
 (deftest test-ctor-throws-if-host-not-set
-  (let [conf (read-default-config)
+  (let [conf (merge
+              (read-default-config)
+              {STORM-NIMBUS-RETRY-TIMES 0})
         timeout (Integer. 60)]
-    (is (thrown? TTransportException
-         (ThriftClient. conf "" 4242 timeout)))
-    (is (thrown? IllegalArgumentException
-        (ThriftClient. conf nil 4242 timeout)))
+    (is (thrown-cause? TTransportException
+         (ThriftClient. conf ThriftConnectionType/DRPC "" (int 4242) timeout)))
+    (is (thrown-cause? IllegalArgumentException
+        (ThriftClient. conf ThriftConnectionType/DRPC nil (int 4242) timeout)))
   )
 )

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/ThriftServer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/ThriftServer_test.clj b/storm-core/test/clj/backtype/storm/security/auth/ThriftServer_test.clj
index 6213d4f..c8ed70e 100644
--- a/storm-core/test/clj/backtype/storm/security/auth/ThriftServer_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/ThriftServer_test.clj
@@ -16,14 +16,16 @@
 (ns backtype.storm.security.auth.ThriftServer-test
   (:use [backtype.storm config])
   (:use [clojure test])
-  (:import [backtype.storm.security.auth ThriftServer])
+  (:import [backtype.storm.security.auth ThriftServer ThriftConnectionType])
   (:import [org.apache.thrift.transport TTransportException])
 )
 
 (deftest test-stop-checks-for-null
-  (let [server (ThriftServer. (read-default-config) nil 12345)]
+  (let [server (ThriftServer. (read-default-config) nil 
+                              ThriftConnectionType/DRPC)]
     (.stop server)))
 
 (deftest test-isServing-checks-for-null
-  (let [server (ThriftServer. (read-default-config) nil 12345)]
+  (let [server (ThriftServer. (read-default-config) nil 
+                              ThriftConnectionType/DRPC)]
     (is (not (.isServing server)))))