You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/21 18:06:58 UTC
[03/14] STORM-216: Added Authentication and Authorization.
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/cluster_test.clj b/storm-core/test/clj/backtype/storm/cluster_test.clj
index 987429b..ba94ab1 100644
--- a/storm-core/test/clj/backtype/storm/cluster_test.clj
+++ b/storm-core/test/clj/backtype/storm/cluster_test.clj
@@ -16,6 +16,14 @@
(ns backtype.storm.cluster-test
(:import [java.util Arrays])
(:import [backtype.storm.daemon.common Assignment StormBase SupervisorInfo])
+ (:import [org.apache.zookeeper ZooDefs ZooDefs$Ids])
+ (:import [org.mockito Mockito])
+ (:import [org.mockito.exceptions.base MockitoAssertionError])
+ (:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory CuratorFrameworkFactory$Builder])
+ (:import [backtype.storm.utils Utils TestUtils ZookeeperAuthInfo])
+ (:require [backtype.storm [zookeeper :as zk]])
+ (:require [conjure.core])
+ (:use [conjure core])
(:use [clojure test])
(:use [backtype.storm cluster config util testing]))
@@ -25,7 +33,8 @@
STORM-ZOOKEEPER-SERVERS ["localhost"]}))
(defn mk-state
- ([zk-port] (mk-distributed-cluster-state (mk-config zk-port)))
+ ([zk-port] (let [conf (mk-config zk-port)]
+ (mk-distributed-cluster-state conf :auth-conf conf)))
([zk-port cb]
(let [ret (mk-state zk-port)]
(.register ret cb)
@@ -36,16 +45,16 @@
(deftest test-basics
(with-inprocess-zookeeper zk-port
(let [state (mk-state zk-port)]
- (.set-data state "/root" (barr 1 2 3))
+ (.set-data state "/root" (barr 1 2 3) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (Arrays/equals (barr 1 2 3) (.get-data state "/root" false)))
(is (= nil (.get-data state "/a" false)))
- (.set-data state "/root/a" (barr 1 2))
- (.set-data state "/root" (barr 1))
+ (.set-data state "/root/a" (barr 1 2) ZooDefs$Ids/OPEN_ACL_UNSAFE)
+ (.set-data state "/root" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (Arrays/equals (barr 1) (.get-data state "/root" false)))
(is (Arrays/equals (barr 1 2) (.get-data state "/root/a" false)))
- (.set-data state "/a/b/c/d" (barr 99))
+ (.set-data state "/a/b/c/d" (barr 99) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (Arrays/equals (barr 99) (.get-data state "/a/b/c/d" false)))
- (.mkdirs state "/lalala")
+ (.mkdirs state "/lalala" ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (= [] (.get-children state "/lalala" false)))
(is (= #{"root" "a" "lalala"} (set (.get-children state "/" false))))
(.delete-node state "/a")
@@ -58,7 +67,7 @@
(with-inprocess-zookeeper zk-port
(let [state1 (mk-state zk-port)
state2 (mk-state zk-port)]
- (.set-data state1 "/root" (barr 1))
+ (.set-data state1 "/root" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (Arrays/equals (barr 1) (.get-data state1 "/root" false)))
(is (Arrays/equals (barr 1) (.get-data state2 "/root" false)))
(.delete-node state2 "/root")
@@ -73,7 +82,7 @@
(let [state1 (mk-state zk-port)
state2 (mk-state zk-port)
state3 (mk-state zk-port)]
- (.set-ephemeral-node state1 "/a" (barr 1))
+ (.set-ephemeral-node state1 "/a" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (Arrays/equals (barr 1) (.get-data state1 "/a" false)))
(is (Arrays/equals (barr 1) (.get-data state2 "/a" false)))
(.close state3)
@@ -111,37 +120,37 @@
state1 (mk-state zk-port state1-cb)
[state2-last-cb state2-cb] (mk-callback-tester)
state2 (mk-state zk-port state2-cb)]
- (.set-data state1 "/root" (barr 1))
+ (.set-data state1 "/root" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(.get-data state2 "/root" true)
(is (= nil @state1-last-cb))
(is (= nil @state2-last-cb))
- (.set-data state2 "/root" (barr 2))
+ (.set-data state2 "/root" (barr 2) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (= {:type :node-data-changed :path "/root"} (read-and-reset! state2-last-cb)))
(is (= nil @state1-last-cb))
- (.set-data state2 "/root" (barr 3))
+ (.set-data state2 "/root" (barr 3) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (= nil @state2-last-cb))
(.get-data state2 "/root" true)
(.get-data state2 "/root" false)
(.delete-node state1 "/root")
(is (= {:type :node-deleted :path "/root"} (read-and-reset! state2-last-cb)))
(.get-data state2 "/root" true)
- (.set-ephemeral-node state1 "/root" (barr 1 2 3 4))
+ (.set-ephemeral-node state1 "/root" (barr 1 2 3 4) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (= {:type :node-created :path "/root"} (read-and-reset! state2-last-cb)))
(.get-children state1 "/" true)
- (.set-data state2 "/a" (barr 9))
+ (.set-data state2 "/a" (barr 9) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (= nil @state2-last-cb))
(is (= {:type :node-children-changed :path "/"} (read-and-reset! state1-last-cb)))
(.get-data state2 "/root" true)
- (.set-ephemeral-node state1 "/root" (barr 1 2))
+ (.set-ephemeral-node state1 "/root" (barr 1 2) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (= {:type :node-data-changed :path "/root"} (read-and-reset! state2-last-cb)))
- (.mkdirs state1 "/ccc")
+ (.mkdirs state1 "/ccc" ZooDefs$Ids/OPEN_ACL_UNSAFE)
(.get-children state1 "/ccc" true)
(.get-data state2 "/ccc/b" true)
- (.set-data state2 "/ccc/b" (barr 8))
+ (.set-data state2 "/ccc/b" (barr 8) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (= {:type :node-created :path "/ccc/b"} (read-and-reset! state2-last-cb)))
(is (= {:type :node-children-changed :path "/ccc"} (read-and-reset! state1-last-cb)))
@@ -150,7 +159,7 @@
(.close state1)
(is (= {:type :node-deleted :path "/root"} (read-and-reset! state2-last-cb)))
- (.set-data state2 "/root2" (barr 9))
+ (.set-data state2 "/root2" (barr 9) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (= {:type :node-created :path "/root2"} (read-and-reset! state2-last-cb)))
(.close state2)
)))
@@ -161,8 +170,8 @@
(let [state (mk-storm-state zk-port)
assignment1 (Assignment. "/aaa" {} {1 [2 2002 1]} {})
assignment2 (Assignment. "/aaa" {} {1 [2 2002]} {})
- base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {})
- base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {})]
+ base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "")
+ base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "")]
(is (= [] (.assignments state nil)))
(.set-assignment! state "storm1" assignment1)
(is (= assignment1 (.assignment-info state "storm1" nil)))
@@ -186,6 +195,11 @@
(is (= base2 (.storm-base state "storm2" nil)))
(is (= #{"storm2"} (set (.active-storms state))))
+ (is (nil? (.credentials state "storm1" nil)))
+ (.set-credentials! state "storm1" {"a" "a"} {})
+ (is (= {"a" "a"} (.credentials state "storm1" nil)))
+ (.set-credentials! state "storm1" {"b" "b"} {})
+ (is (= {"b" "b"} (.credentials state "storm1" nil)))
;; TODO add tests for task info and task heartbeat setting and getting
(.disconnect state)
@@ -240,9 +254,44 @@
(.disconnect state1)
)))
+
+
+(deftest test-cluster-authentication
+ (with-inprocess-zookeeper zk-port
+ (let [builder (Mockito/mock CuratorFrameworkFactory$Builder)
+ conf (merge
+ (mk-config zk-port)
+ {STORM-ZOOKEEPER-CONNECTION-TIMEOUT 10
+ STORM-ZOOKEEPER-SESSION-TIMEOUT 10
+ STORM-ZOOKEEPER-RETRY-INTERVAL 5
+ STORM-ZOOKEEPER-RETRY-TIMES 2
+ STORM-ZOOKEEPER-RETRY-INTERVAL-CEILING 15
+ STORM-ZOOKEEPER-AUTH-SCHEME "digest"
+ STORM-ZOOKEEPER-AUTH-PAYLOAD "storm:thisisapoorpassword"})]
+ (. (Mockito/when (.connectString builder (Mockito/anyString))) (thenReturn builder))
+ (. (Mockito/when (.connectionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
+ (. (Mockito/when (.sessionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
+ (TestUtils/testSetupBuilder builder (str zk-port "/") conf (ZookeeperAuthInfo. conf))
+ (is (nil?
+ (try
+ (. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD))))
+ (catch MockitoAssertionError e
+ e)))))))
+
(deftest test-storm-state-callbacks
;; TODO finish
)
-
-
+(deftest test-cluster-state-default-acls
+ (testing "The default ACLs are empty."
+ (stubbing [zk/mkdirs nil
+ zk/mk-client (reify CuratorFramework (^void close [this] nil))]
+ (mk-distributed-cluster-state {})
+ (verify-call-times-for zk/mkdirs 1)
+ (verify-first-call-args-for-indices zk/mkdirs [2] nil))
+ (stubbing [mk-distributed-cluster-state nil
+ register nil
+ mkdirs nil]
+ (mk-storm-cluster-state {})
+ (verify-call-times-for mk-distributed-cluster-state 1)
+ (verify-first-call-args-for-indices mk-distributed-cluster-state [4] nil))))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/config_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/config_test.clj b/storm-core/test/clj/backtype/storm/config_test.clj
index 01f788b..2b7f96e 100644
--- a/storm-core/test/clj/backtype/storm/config_test.clj
+++ b/storm-core/test/clj/backtype/storm/config_test.clj
@@ -84,6 +84,17 @@
(is (thrown-cause? java.lang.IllegalArgumentException
(.validateField validator "test" 42)))))
+(deftest test-positive-integer-validator
+ (let [validator ConfigValidation/PositiveIntegerValidator]
+ (doseq [x [42.42 -32 0 -0 "Forty-two"]]
+ (is (thrown-cause? java.lang.IllegalArgumentException
+ (.validateField validator "test" x))))
+
+ (doseq [x [42 4294967296 1 nil]]
+ (is (nil? (try
+ (.validateField validator "test" x)
+ (catch Exception e e)))))))
+
(deftest test-worker-childopts-is-string-or-string-list
(let [pass-cases [nil "some string" ["some" "string" "list"]]]
(testing "worker.childopts validates"
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/logviewer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/logviewer_test.clj b/storm-core/test/clj/backtype/storm/logviewer_test.clj
new file mode 100644
index 0000000..37e63b9
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/logviewer_test.clj
@@ -0,0 +1,187 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.logviewer-test
+ (:use [backtype.storm config util])
+ (:require [backtype.storm.daemon [logviewer :as logviewer]
+ [supervisor :as supervisor]])
+ (:require [conjure.core])
+ (:use [clojure test])
+ (:use [conjure core])
+ (:import [org.mockito Mockito]))
+
+(defmulti mk-mock-File #(:type %))
+
+(defmethod mk-mock-File :file [{file-name :name mtime :mtime
+ :or {file-name "afile" mtime 1}}]
+ (let [mockFile (Mockito/mock java.io.File)]
+ (. (Mockito/when (.getName mockFile)) thenReturn file-name)
+ (. (Mockito/when (.lastModified mockFile)) thenReturn mtime)
+ (. (Mockito/when (.isFile mockFile)) thenReturn true)
+ (. (Mockito/when (.getCanonicalPath mockFile))
+ thenReturn (str "/mock/canonical/path/to/" file-name))
+ mockFile))
+
+(defmethod mk-mock-File :directory [{dir-name :name mtime :mtime
+ :or {dir-name "adir" mtime 1}}]
+ (let [mockDir (Mockito/mock java.io.File)]
+ (. (Mockito/when (.getName mockDir)) thenReturn dir-name)
+ (. (Mockito/when (.lastModified mockDir)) thenReturn mtime)
+ (. (Mockito/when (.isFile mockDir)) thenReturn false)
+ mockDir))
+
+(deftest test-mk-FileFilter-for-log-cleanup
+ (testing "log file filter selects the correct log files for purge"
+ (let [now-millis (current-time-millis)
+ conf {LOGVIEWER-CLEANUP-AGE-MINS 60
+ LOGVIEWER-CLEANUP-INTERVAL-SECS 300}
+ cutoff-millis (logviewer/cleanup-cutoff-age-millis conf now-millis)
+ old-mtime-millis (- cutoff-millis 500)
+ new-mtime-millis (+ cutoff-millis 500)
+ matching-files (map #(mk-mock-File %)
+ [{:name "oldlog-1-2-worker-3.log"
+ :type :file
+ :mtime old-mtime-millis}
+ {:name "oldlog-1-2-worker-3.log.8"
+ :type :file
+ :mtime old-mtime-millis}
+ {:name "foobar*_topo-1-24242-worker-2834238.log"
+ :type :file
+ :mtime old-mtime-millis}])
+ excluded-files (map #(mk-mock-File %)
+ [{:name "oldlog-1-2-worker-.log"
+ :type :file
+ :mtime old-mtime-millis}
+ {:name "olddir-1-2-worker.log"
+ :type :directory
+ :mtime old-mtime-millis}
+ {:name "newlog-1-2-worker.log"
+ :type :file
+ :mtime new-mtime-millis}
+ {:name "some-old-file.txt"
+ :type :file
+ :mtime old-mtime-millis}
+ {:name "metadata"
+ :type :directory
+ :mtime old-mtime-millis}
+ {:name "newdir-1-2-worker.log"
+ :type :directory
+ :mtime new-mtime-millis}
+ {:name "newdir"
+ :type :directory
+ :mtime new-mtime-millis}
+ ])
+ file-filter (logviewer/mk-FileFilter-for-log-cleanup conf now-millis)]
+ (is (every? #(.accept file-filter %) matching-files))
+ (is (not-any? #(.accept file-filter %) excluded-files))
+ )))
+
+(deftest test-get-log-root->files-map
+ (testing "returns map of root name to list of files"
+ (let [files (vec (map #(java.io.File. %) ["log-1-2-worker-3.log"
+ "log-1-2-worker-3.log.1"
+ "log-2-4-worker-6.log.1"]))
+ expected {"log-1-2-worker-3" #{(files 0) (files 1)}
+ "log-2-4-worker-6" #{(files 2)}}]
+ (is (= expected (logviewer/get-log-root->files-map files))))))
+
+(deftest test-identify-worker-log-files
+ (testing "Does not include metadata file when there are any log files that
+ should not be cleaned up"
+ (let [cutoff-millis 2000
+ old-logFile (mk-mock-File {:name "mock-1-1-worker-1.log.1"
+ :type :file
+ :mtime (- cutoff-millis 1000)})
+ mock-metaFile (mk-mock-File {:name "mock-1-1-worker-1.yaml"
+ :type :file
+ :mtime 1})
+ new-logFile (mk-mock-File {:name "mock-1-1-worker-1.log"
+ :type :file
+ :mtime (+ cutoff-millis 1000)})
+ exp-id "id12345"
+ exp-user "alice"
+ expected {exp-id {:owner exp-user
+ :files #{old-logFile}}}]
+ (stubbing [supervisor/read-worker-heartbeats nil
+ logviewer/get-metadata-file-for-log-root-name mock-metaFile
+ read-dir-contents [(.getName old-logFile) (.getName new-logFile)]
+ logviewer/get-worker-id-from-metadata-file exp-id
+ logviewer/get-topo-owner-from-metadata-file exp-user]
+ (is (= expected (logviewer/identify-worker-log-files [old-logFile] "/tmp/")))))))
+
+(deftest test-get-dead-worker-files-and-owners
+ (testing "removes any files of workers that are still alive"
+ (let [conf {SUPERVISOR-WORKER-TIMEOUT-SECS 5}
+ id->hb {"42" {:time-secs 1}}
+ now-secs 2
+ log-files #{:expected-file :unexpected-file}
+ exp-owner "alice"]
+ (stubbing [logviewer/identify-worker-log-files {"42" {:owner exp-owner
+ :files #{:unexpected-file}}
+ "007" {:owner exp-owner
+ :files #{:expected-file}}}
+ logviewer/get-topo-owner-from-metadata-file "alice"
+ supervisor/read-worker-heartbeats id->hb]
+ (is (= [{:owner exp-owner :files #{:expected-file}}]
+ (logviewer/get-dead-worker-files-and-owners conf now-secs log-files "/tmp/")))))))
+
+(deftest test-cleanup-fn
+ (testing "cleanup function removes file as user when one is specified"
+ (let [exp-user "mock-user"
+ mockfile1 (mk-mock-File {:name "file1" :type :file})
+ mockfile2 (mk-mock-File {:name "file2" :type :file})
+ mockfile3 (mk-mock-File {:name "file3" :type :file})
+ mockyaml (mk-mock-File {:name "foo.yaml" :type :file})
+ exp-cmd (str "rmr /mock/canonical/path/to/" (.getName mockfile3))]
+ (stubbing [logviewer/select-files-for-cleanup
+ [(mk-mock-File {:name "throwaway" :type :file})]
+ logviewer/get-dead-worker-files-and-owners
+ [{:owner nil :files #{mockfile1}}
+ {:files #{mockfile2}}
+ {:owner exp-user :files #{mockfile3 mockyaml}}]
+ supervisor/worker-launcher nil
+ rmr nil]
+ (logviewer/cleanup-fn! "/tmp/")
+ (verify-call-times-for supervisor/worker-launcher 1)
+ (verify-first-call-args-for-indices supervisor/worker-launcher
+ [1 2] exp-user exp-cmd)
+ (verify-call-times-for rmr 3)
+ (verify-nth-call-args-for 1 rmr (.getCanonicalPath mockfile1))
+ (verify-nth-call-args-for 2 rmr (.getCanonicalPath mockfile2))
+ (verify-nth-call-args-for 3 rmr (.getCanonicalPath mockyaml))))))
+
+(deftest test-authorized-log-user
+ (testing "allow cluster admin"
+ (let [conf {NIMBUS-ADMINS ["alice"]}]
+ (stubbing [logviewer/get-log-user-whitelist []]
+ (is (logviewer/authorized-log-user? "alice" "non-blank-fname" conf)))))
+
+ (testing "ignore any cluster-set topology.users"
+ (let [conf {TOPOLOGY-USERS ["alice"]}]
+ (stubbing [logviewer/get-log-user-whitelist []]
+ (is (not (logviewer/authorized-log-user? "alice" "non-blank-fname" conf))))))
+
+ (testing "allow cluster logs user"
+ (let [conf {LOGS-USERS ["alice"]}]
+ (stubbing [logviewer/get-log-user-whitelist []]
+ (is (logviewer/authorized-log-user? "alice" "non-blank-fname" conf)))))
+
+ (testing "allow whitelisted topology user"
+ (stubbing [logviewer/get-log-user-whitelist ["alice"]]
+ (is (logviewer/authorized-log-user? "alice" "non-blank-fname" {}))))
+
+ (testing "disallow user not in nimbus admin, topo user, logs user, or whitelist"
+ (stubbing [logviewer/get-log-user-whitelist []]
+ (is (not (logviewer/authorized-log-user? "alice" "non-blank-fname" {}))))))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
index 31e69e8..8534c82 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
@@ -30,8 +30,7 @@
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
- STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
- }]
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1}]
(let [topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
{"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/nimbus_test.clj b/storm-core/test/clj/backtype/storm/nimbus_test.clj
index 1e6554a..c902119 100644
--- a/storm-core/test/clj/backtype/storm/nimbus_test.clj
+++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj
@@ -15,12 +15,15 @@
;; limitations under the License.
(ns backtype.storm.nimbus-test
(:use [clojure test])
+ (:require [backtype.storm [util :as util]])
(:require [backtype.storm.daemon [nimbus :as nimbus]])
-
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
(:import [backtype.storm.scheduler INimbus])
+ (:import [backtype.storm.generated Credentials])
(:use [backtype.storm bootstrap testing])
(:use [backtype.storm.daemon common])
+ (:require [conjure.core])
+ (:use [conjure core])
)
(bootstrap)
@@ -159,6 +162,8 @@
(is (not-nil? ((:executor->start-time-secs assignment) e))))
))
+
+
(deftest test-bogusId
(with-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3 :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0}]
(let [state (:storm-cluster-state cluster)
@@ -167,6 +172,7 @@
(is (thrown? NotAliveException (.getTopology nimbus "bogus-id")))
(is (thrown? NotAliveException (.getUserTopology nimbus "bogus-id")))
(is (thrown? NotAliveException (.getTopologyInfo nimbus "bogus-id")))
+ (is (thrown? NotAliveException (.uploadNewCredentials nimbus "bogus-id" (Credentials.))))
)))
(deftest test-assignment
@@ -724,10 +730,77 @@
)))
+
+(defn check-for-collisions [state]
+ (log-message "Checking for collision")
+ (let [assignments (.assignments state nil)]
+ (log-message "Assignemts: " assignments)
+ (let [id->node->ports (into {} (for [id assignments
+ :let [executor->node+port (:executor->node+port (.assignment-info state id nil))
+ node+ports (set (.values executor->node+port))
+ node->ports (apply merge-with (fn [a b] (distinct (concat a b))) (for [[node port] node+ports] {node [port]}))]]
+ {id node->ports}))
+ _ (log-message "id->node->ports: " id->node->ports)
+ all-nodes (apply merge-with (fn [a b]
+ (let [ret (concat a b)]
+ (log-message "Can we combine " (pr-str a) " and " (pr-str b) " without collisions? " (apply distinct? ret) " => " (pr-str ret))
+ (is (apply distinct? ret))
+ (distinct ret)))
+ (.values id->node->ports))]
+)))
+
+(deftest test-rebalance-constrained-cluster
+ (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 4
+ :daemon-conf {SUPERVISOR-ENABLE false
+ NIMBUS-MONITOR-FREQ-SECS 10
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+ TOPOLOGY-ACKER-EXECUTORS 0}]
+ (letlocals
+ (bind topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
+ {}))
+ (bind topology2 (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
+ {}))
+ (bind topology3 (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
+ {}))
+ (bind state (:storm-cluster-state cluster))
+ (submit-local-topology (:nimbus cluster)
+ "test"
+ {TOPOLOGY-WORKERS 3
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology)
+ (submit-local-topology (:nimbus cluster)
+ "test2"
+ {TOPOLOGY-WORKERS 3
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology2)
+ (submit-local-topology (:nimbus cluster)
+ "test3"
+ {TOPOLOGY-WORKERS 3
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology3)
+
+ (advance-cluster-time cluster 31)
+
+ (check-for-collisions state)
+ (.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.)
+ (.set_num_workers 4)
+ (.set_wait_secs 0)
+ ))
+
+ (advance-cluster-time cluster 11)
+ (check-for-collisions state)
+
+ (advance-cluster-time cluster 30)
+ (check-for-collisions state)
+ )))
+
+
(deftest test-submit-invalid
(with-simulated-time-local-cluster [cluster
:daemon-conf {SUPERVISOR-ENABLE false
- TOPOLOGY-ACKER-EXECUTORS 0}]
+ TOPOLOGY-ACKER-EXECUTORS 0
+ NIMBUS-EXECUTORS-PER-TOPOLOGY 8
+ NIMBUS-SLOTS-PER-TOPOLOGY 8}]
(letlocals
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 0 :conf {TOPOLOGY-TASKS 1})}
@@ -747,7 +820,31 @@
"test/aaa"
{}
topology)))
- )))
+ (bind topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true)
+ :parallelism-hint 16
+ :conf {TOPOLOGY-TASKS 16})}
+ {}))
+ (bind state (:storm-cluster-state cluster))
+ (is (thrown? InvalidTopologyException
+ (submit-local-topology (:nimbus cluster)
+ "test"
+ {TOPOLOGY-WORKERS 3}
+ topology)))
+ (bind topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true)
+ :parallelism-hint 5
+ :conf {TOPOLOGY-TASKS 5})}
+ {}))
+ (is (thrown? InvalidTopologyException
+ (submit-local-topology (:nimbus cluster)
+ "test"
+ {TOPOLOGY-WORKERS 16}
+ topology)))
+ (is (nil? (submit-local-topology (:nimbus cluster)
+ "test"
+ {TOPOLOGY-WORKERS 8}
+ topology))))))
(deftest test-cleans-corrupt
(with-inprocess-zookeeper zk-port
@@ -775,13 +872,13 @@
(.disconnect cluster-state)
))))
-(deftest test-no-overlapping-slots
- ;; test that same node+port never appears across 2 assignments
- )
+;(deftest test-no-overlapping-slots
+; ;; test that same node+port never appears across 2 assignments
+; )
-(deftest test-stateless
- ;; test that nimbus can die and restart without any problems
- )
+;(deftest test-stateless
+; ;; test that nimbus can die and restart without any problems
+; )
(deftest test-clean-inbox
"Tests that the inbox correctly cleans jar files."
@@ -816,11 +913,228 @@
(assert-files-in-dir [])
))))
+(deftest test-nimbus-iface-submitTopologyWithOpts-checks-authorization
+ (with-local-cluster [cluster
+ :daemon-conf {NIMBUS-AUTHORIZER
+ "backtype.storm.security.auth.authorizer.DenyAuthorizer"}]
+ (let [
+ nimbus (:nimbus cluster)
+ topology (thrift/mk-topology {} {})
+ ]
+ (is (thrown? AuthorizationException
+ (submit-local-topology-with-opts nimbus "mystorm" {} topology
+ (SubmitOptions. TopologyInitialStatus/INACTIVE))
+ ))
+ )
+ )
+)
+
+(deftest test-nimbus-iface-methods-check-authorization
+ (with-local-cluster [cluster
+ :daemon-conf {NIMBUS-AUTHORIZER
+ "backtype.storm.security.auth.authorizer.DenyAuthorizer"}]
+ (let [
+ nimbus (:nimbus cluster)
+ topology (thrift/mk-topology {} {})
+ ]
+ ; Fake good authorization as part of setup.
+ (mocking [nimbus/check-authorization!]
+ (submit-local-topology-with-opts nimbus "test" {} topology
+ (SubmitOptions. TopologyInitialStatus/INACTIVE))
+ )
+ (stubbing [nimbus/storm-active? true]
+ (is (thrown? AuthorizationException
+ (.rebalance nimbus "test" (RebalanceOptions.))
+ ))
+ )
+ (is (thrown? AuthorizationException
+ (.activate nimbus "test")
+ ))
+ (is (thrown? AuthorizationException
+ (.deactivate nimbus "test")
+ ))
+ )
+ )
+)
+
+(deftest test-nimbus-check-authorization-params
+ (with-local-cluster [cluster
+ :daemon-conf {NIMBUS-AUTHORIZER "backtype.storm.security.auth.authorizer.NoopAuthorizer"}]
+ (let [nimbus (:nimbus cluster)
+ topology-name "test-nimbus-check-autho-params"
+ topology (thrift/mk-topology {} {})]
+
+ (submit-local-topology-with-opts nimbus topology-name {} topology
+ (SubmitOptions. TopologyInitialStatus/INACTIVE))
+
+ (let [expected-name topology-name
+ expected-conf {TOPOLOGY-NAME expected-name
+ :foo :bar}]
+
+ (testing "getTopologyConf calls check-authorization! with the correct parameters."
+ (let [expected-operation "getTopologyConf"]
+ (stubbing [nimbus/check-authorization! nil
+ nimbus/try-read-storm-conf expected-conf
+ util/to-json nil]
+ (try
+ (.getTopologyConf nimbus "fake-id")
+ (catch NotAliveException e)
+ (finally
+ (verify-first-call-args-for-indices
+ nimbus/check-authorization!
+ [1 2 3] expected-name expected-conf expected-operation)
+ (verify-first-call-args-for util/to-json expected-conf))))))
+
+ (testing "getTopology calls check-authorization! with the correct parameters."
+ (let [expected-operation "getTopology"]
+ (stubbing [nimbus/check-authorization! nil
+ nimbus/try-read-storm-conf expected-conf
+ nimbus/try-read-storm-topology nil
+ system-topology! nil]
+ (try
+ (.getTopology nimbus "fake-id")
+ (catch NotAliveException e)
+ (finally
+ (verify-first-call-args-for-indices
+ nimbus/check-authorization!
+ [1 2 3] expected-name expected-conf expected-operation)
+ (verify-first-call-args-for-indices
+ system-topology! [0] expected-conf))))))
+
+ (testing "getUserTopology calls check-authorization with the correct parameters."
+ (let [expected-operation "getUserTopology"]
+ (stubbing [nimbus/check-authorization! nil
+ nimbus/try-read-storm-conf expected-conf
+ nimbus/try-read-storm-topology nil]
+ (try
+ (.getUserTopology nimbus "fake-id")
+ (catch NotAliveException e)
+ (finally
+ (verify-first-call-args-for-indices
+ nimbus/check-authorization!
+ [1 2 3] expected-name expected-conf expected-operation)
+ (verify-first-call-args-for-indices
+ nimbus/try-read-storm-topology [0] expected-conf))))))))))
+
+(deftest test-nimbus-iface-getTopology-methods-throw-correctly
+ (with-local-cluster [cluster]
+ (let [
+ nimbus (:nimbus cluster)
+ id "bogus ID"
+ ]
+ (is (thrown? NotAliveException (.getTopology nimbus id)))
+ (try
+ (.getTopology nimbus id)
+ (catch NotAliveException e
+ (is (= id (.get_msg e)))
+ )
+ )
+
+ (is (thrown? NotAliveException (.getTopologyConf nimbus id)))
+ (try (.getTopologyConf nimbus id)
+ (catch NotAliveException e
+ (is (= id (.get_msg e)))
+ )
+ )
+
+ (is (thrown? NotAliveException (.getTopologyInfo nimbus id)))
+ (try (.getTopologyInfo nimbus id)
+ (catch NotAliveException e
+ (is (= id (.get_msg e)))
+ )
+ )
+
+ (is (thrown? NotAliveException (.getUserTopology nimbus id)))
+ (try (.getUserTopology nimbus id)
+ (catch NotAliveException e
+ (is (= id (.get_msg e)))
+ )
+ )
+ )
+ )
+)
+
+(deftest test-nimbus-iface-getClusterInfo-filters-topos-without-bases
+ (with-local-cluster [cluster]
+ (let [
+ nimbus (:nimbus cluster)
+ bogus-secs 42
+ bogus-type "bogusType"
+ bogus-bases {
+ "1" nil
+ "2" {:launch-time-secs bogus-secs
+ :storm-name "id2-name"
+ :status {:type bogus-type}}
+ "3" nil
+ "4" {:launch-time-secs bogus-secs
+ :storm-name "id4-name"
+ :status {:type bogus-type}}
+ }
+ ]
+ (stubbing [topology-bases bogus-bases]
+ (let [topos (.get_topologies (.getClusterInfo nimbus))]
+ ; The number of topologies in the summary is correct.
+ (is (= (count
+ (filter (fn [b] (second b)) bogus-bases)) (count topos)))
+ ; Each topology present has a valid name.
+ (is (empty?
+ (filter (fn [t] (or (nil? t) (nil? (.get_name t)))) topos)))
+ ; The topologies are those with valid bases.
+ (is (empty?
+ (filter (fn [t]
+ (or
+ (nil? t)
+ (not (number? (read-string (.get_id t))))
+ (odd? (read-string (.get_id t)))
+ )) topos)))
+ )
+ )
+ )
+ )
+)
+
+(deftest test-defserverfn-numbus-iface-instance
+ (test-nimbus-iface-submitTopologyWithOpts-checks-authorization)
+ (test-nimbus-iface-methods-check-authorization)
+ (test-nimbus-iface-getTopology-methods-throw-correctly)
+ (test-nimbus-iface-getClusterInfo-filters-topos-without-bases)
+)
+
+(deftest test-nimbus-data-acls
+ (testing "nimbus-data uses correct ACLs"
+ (let [scheme "digest"
+ digest "storm:thisisapoorpassword"
+ auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme
+ STORM-ZOOKEEPER-AUTH-PAYLOAD digest}
+ expected-acls nimbus/NIMBUS-ZK-ACLS
+ fake-inimbus (reify INimbus (getForcedScheduler [this] nil))]
+ (stubbing [mk-authorization-handler nil
+ cluster/mk-storm-cluster-state nil
+ nimbus/file-cache-map nil
+ uptime-computer nil
+ new-instance nil
+ mk-timer nil
+ nimbus/mk-scheduler nil]
+ (nimbus/nimbus-data auth-conf fake-inimbus)
+ (verify-call-times-for cluster/mk-storm-cluster-state 1)
+ (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
+ expected-acls)))))
+
+(deftest test-file-bogus-download
+ (with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0}]
+ (let [nimbus (:nimbus cluster)]
+ (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus nil)))
+ (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus "")))
+ (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus "/bogus-path/foo")))
+ )))
+
(deftest test-validate-topo-config-on-submit
(with-local-cluster [cluster]
(let [nimbus (:nimbus cluster)
topology (thrift/mk-topology {} {})
- bad-config {"topology.workers" "3"}]
- (is (thrown-cause? InvalidTopologyException
- (submit-local-topology-with-opts nimbus "test" bad-config topology
- (SubmitOptions.)))))))
+ bad-config {"topology.isolate.machines" "2"}]
+ ; Fake good authorization as part of setup.
+ (mocking [nimbus/check-authorization!]
+ (is (thrown-cause? InvalidTopologyException
+ (submit-local-topology-with-opts nimbus "test" bad-config topology
+ (SubmitOptions.))))))))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj b/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj
new file mode 100644
index 0000000..4e79240
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj
@@ -0,0 +1,737 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.scheduler.multitenant-scheduler-test
+ (:use [clojure test])
+ (:use [backtype.storm bootstrap config testing])
+ (:require [backtype.storm.daemon [nimbus :as nimbus]])
+ (:import [backtype.storm.generated StormTopology])
+ (:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails
+ SchedulerAssignmentImpl Topologies TopologyDetails])
+ (:import [backtype.storm.scheduler.multitenant Node NodePool FreePool DefaultPool
+ IsolatedPool MultitenantScheduler]))
+
+(bootstrap)
+
+(defn gen-supervisors [count]
+ (into {} (for [id (range count)
+ :let [supervisor (SupervisorDetails. (str "super" id) (str "host" id) (list ) (map int (list 1 2 3 4)))]]
+ {(.getId supervisor) supervisor})))
+
+(defn to-top-map [topologies]
+ (into {} (for [top topologies] {(.getId top) top})))
+
+(defn ed [id] (ExecutorDetails. (int id) (int id)))
+
+(defn mk-ed-map [arg]
+ (into {}
+ (for [[name start end] arg]
+ (into {}
+ (for [at (range start end)]
+ {(ed at) name})))))
+
+(deftest test-node
+ (let [supers (gen-supervisors 5)
+ cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+ node-map (Node/getAllNodesFrom cluster)]
+ (is (= 5 (.size node-map)))
+ (let [node (.get node-map "super0")]
+ (is (= "super0" (.getId node)))
+ (is (= true (.isAlive node)))
+ (is (= 0 (.size (.getRunningTopologies node))))
+ (is (= true (.isTotallyFree node)))
+ (is (= 4 (.totalSlotsFree node)))
+ (is (= 0 (.totalSlotsUsed node)))
+ (is (= 4 (.totalSlots node)))
+ (.assign node "topology1" (list (ExecutorDetails. 1 1)) cluster)
+ (is (= 1 (.size (.getRunningTopologies node))))
+ (is (= false (.isTotallyFree node)))
+ (is (= 3 (.totalSlotsFree node)))
+ (is (= 1 (.totalSlotsUsed node)))
+ (is (= 4 (.totalSlots node)))
+ (.assign node "topology1" (list (ExecutorDetails. 2 2)) cluster)
+ (is (= 1 (.size (.getRunningTopologies node))))
+ (is (= false (.isTotallyFree node)))
+ (is (= 2 (.totalSlotsFree node)))
+ (is (= 2 (.totalSlotsUsed node)))
+ (is (= 4 (.totalSlots node)))
+ (.assign node "topology2" (list (ExecutorDetails. 1 1)) cluster)
+ (is (= 2 (.size (.getRunningTopologies node))))
+ (is (= false (.isTotallyFree node)))
+ (is (= 1 (.totalSlotsFree node)))
+ (is (= 3 (.totalSlotsUsed node)))
+ (is (= 4 (.totalSlots node)))
+ (.assign node "topology2" (list (ExecutorDetails. 2 2)) cluster)
+ (is (= 2 (.size (.getRunningTopologies node))))
+ (is (= false (.isTotallyFree node)))
+ (is (= 0 (.totalSlotsFree node)))
+ (is (= 4 (.totalSlotsUsed node)))
+ (is (= 4 (.totalSlots node)))
+ (.freeAllSlots node cluster)
+ (is (= 0 (.size (.getRunningTopologies node))))
+ (is (= true (.isTotallyFree node)))
+ (is (= 4 (.totalSlotsFree node)))
+ (is (= 0 (.totalSlotsUsed node)))
+ (is (= 4 (.totalSlots node)))
+ )))
+
+(deftest test-free-pool
+ (let [supers (gen-supervisors 5)
+ cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+ node-map (Node/getAllNodesFrom cluster)
+ free-pool (FreePool. )]
+ ;; assign one node so it is not in the pool
+ (.assign (.get node-map "super0") "topology1" (list (ExecutorDetails. 1 1)) cluster)
+ (.init free-pool cluster node-map)
+ (is (= 4 (.nodesAvailable free-pool)))
+ (is (= (* 4 4) (.slotsAvailable free-pool)))
+ (let [ns-count-1 (.getNodeAndSlotCountIfSlotsWereTaken free-pool 1)
+ ns-count-3 (.getNodeAndSlotCountIfSlotsWereTaken free-pool 3)
+ ns-count-4 (.getNodeAndSlotCountIfSlotsWereTaken free-pool 4)
+ ns-count-5 (.getNodeAndSlotCountIfSlotsWereTaken free-pool 5)]
+ (is (= 1 (._nodes ns-count-1)))
+ (is (= 4 (._slots ns-count-1)))
+ (is (= 1 (._nodes ns-count-3)))
+ (is (= 4 (._slots ns-count-3)))
+ (is (= 1 (._nodes ns-count-4)))
+ (is (= 4 (._slots ns-count-4)))
+ (is (= 2 (._nodes ns-count-5)))
+ (is (= 8 (._slots ns-count-5)))
+ )
+ (let [nodes (.takeNodesBySlots free-pool 5)]
+ (is (= 2 (.size nodes)))
+ (is (= 8 (Node/countFreeSlotsAlive nodes)))
+ (is (= 8 (Node/countTotalSlotsAlive nodes)))
+ (is (= 2 (.nodesAvailable free-pool)))
+ (is (= (* 2 4) (.slotsAvailable free-pool)))
+ )
+ (let [nodes (.takeNodes free-pool 3)] ;;Only 2 should be left
+ (is (= 2 (.size nodes)))
+ (is (= 8 (Node/countFreeSlotsAlive nodes)))
+ (is (= 8 (Node/countTotalSlotsAlive nodes)))
+ (is (= 0 (.nodesAvailable free-pool)))
+ (is (= 0 (.slotsAvailable free-pool)))
+ )))
+
+(deftest test-default-pool-simple
+ (let [supers (gen-supervisors 5)
+ cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+ node-map (Node/getAllNodesFrom cluster)
+ free-pool (FreePool. )
+ default-pool (DefaultPool. )
+ executor1 (ed 1)
+ executor2 (ed 2)
+ executor3 (ed 3)
+ topology1 (TopologyDetails. "topology1"
+ {TOPOLOGY-NAME "topology-name-1"}
+ (StormTopology.)
+ 2
+ {executor1 "spout1"
+ executor2 "bolt1"
+ executor3 "bolt2"})]
+ ;; assign one node so it is not in the pool
+ (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
+ (.init free-pool cluster node-map)
+ (.init default-pool cluster node-map)
+ (is (= true (.canAdd default-pool topology1)))
+ (.addTopology default-pool topology1)
+ ;;Only 1 node is in the default-pool because only one nodes was scheduled already
+ (is (= 4 (.slotsAvailable default-pool)))
+ (is (= 1 (.nodesAvailable default-pool)))
+ (is (= (* 4 4) (.slotsAvailable free-pool)))
+ (is (= 4 (.nodesAvailable free-pool)))
+ (is (= 1 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+ (.scheduleAsNeeded default-pool (into-array NodePool [free-pool]))
+ (is (= 4 (.slotsAvailable default-pool)))
+ (is (= 1 (.nodesAvailable default-pool)))
+ (is (= (* 4 4) (.slotsAvailable free-pool)))
+ (is (= 4 (.nodesAvailable free-pool)))
+ (is (= 2 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+ (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
+))
+
+(deftest test-default-pool-big-request
+ (let [supers (gen-supervisors 5)
+ cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+ node-map (Node/getAllNodesFrom cluster)
+ free-pool (FreePool. )
+ default-pool (DefaultPool. )
+ executor1 (ed 1)
+ executor2 (ed 2)
+ executor3 (ed 3)
+ topology1 (TopologyDetails. "topology1"
+ {TOPOLOGY-NAME "topology-name-1"}
+ (StormTopology.)
+ 5
+ {executor1 "spout1"
+ executor2 "bolt1"
+ executor3 "bolt2"})]
+ ;; assign one node so it is not in the pool
+ (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
+ (.init free-pool cluster node-map)
+ (.init default-pool cluster node-map)
+ (is (= true (.canAdd default-pool topology1)))
+ (.addTopology default-pool topology1)
+ ;;Only 1 node is in the default-pool because only one nodes was scheduled already
+ (is (= 4 (.slotsAvailable default-pool)))
+ (is (= 1 (.nodesAvailable default-pool)))
+ (is (= (* 4 4) (.slotsAvailable free-pool)))
+ (is (= 4 (.nodesAvailable free-pool)))
+ (is (= 1 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+ (.scheduleAsNeeded default-pool (into-array NodePool [free-pool]))
+ (is (= 4 (.slotsAvailable default-pool)))
+ (is (= 1 (.nodesAvailable default-pool)))
+ (is (= (* 4 4) (.slotsAvailable free-pool)))
+ (is (= 4 (.nodesAvailable free-pool)))
+ (is (= 3 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+ (is (= "Fully Scheduled (requested 5 slots, but could only use 3)" (.get (.getStatusMap cluster) "topology1")))
+))
+
+(deftest test-default-pool-big-request-2
+ (let [supers (gen-supervisors 1)
+ cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+ node-map (Node/getAllNodesFrom cluster)
+ free-pool (FreePool. )
+ default-pool (DefaultPool. )
+ executor1 (ed 1)
+ executor2 (ed 2)
+ executor3 (ed 3)
+ executor4 (ed 4)
+ executor5 (ed 5)
+ topology1 (TopologyDetails. "topology1"
+ {TOPOLOGY-NAME "topology-name-1"}
+ (StormTopology.)
+ 5
+ {executor1 "spout1"
+ executor2 "bolt1"
+ executor3 "bolt1"
+ executor4 "bolt1"
+ executor5 "bolt2"})]
+ ;; assign one node so it is not in the pool
+ (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
+ (.init free-pool cluster node-map)
+ (.init default-pool cluster node-map)
+ (is (= true (.canAdd default-pool topology1)))
+ (.addTopology default-pool topology1)
+ ;;Only 1 node is in the default-pool because only one nodes was scheduled already
+ (is (= 4 (.slotsAvailable default-pool)))
+ (is (= 1 (.nodesAvailable default-pool)))
+ (is (= 0 (.slotsAvailable free-pool)))
+ (is (= 0 (.nodesAvailable free-pool)))
+ (is (= 1 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+ (.scheduleAsNeeded default-pool (into-array NodePool [free-pool]))
+ (is (= 4 (.slotsAvailable default-pool)))
+ (is (= 1 (.nodesAvailable default-pool)))
+ (is (= 0 (.slotsAvailable free-pool)))
+ (is (= 0 (.nodesAvailable free-pool)))
+ (is (= 4 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+ (is (= "Running with fewer slots than requested (4/5)" (.get (.getStatusMap cluster) "topology1")))
+))
+
+(deftest test-default-pool-full
+ (let [supers (gen-supervisors 2) ;;make 2 supervisors but only schedule with one of them
+ single-super {(ffirst supers) (second (first supers))}
+ single-cluster (Cluster. (nimbus/standalone-nimbus) single-super {})
+ executor1 (ed 1)
+ executor2 (ed 2)
+ executor3 (ed 3)
+ executor4 (ed 4)
+ executor5 (ed 5)
+ topology1 (TopologyDetails. "topology1"
+ {TOPOLOGY-NAME "topology-name-1"}
+ (StormTopology.)
+ 5
+ {executor1 "spout1"
+ executor2 "bolt1"
+ executor3 "bolt2"
+ executor4 "bolt3"
+ executor5 "bolt4"})]
+ (let [node-map (Node/getAllNodesFrom single-cluster)
+ free-pool (FreePool. )
+ default-pool (DefaultPool. )]
+ (.init free-pool single-cluster node-map)
+ (.init default-pool single-cluster node-map)
+ (.addTopology default-pool topology1)
+ (.scheduleAsNeeded default-pool (into-array NodePool [free-pool]))
+ ;; The cluster should be full and have 4 slots used, but the topology would like 1 more
+ (is (= 4 (.size (.getUsedSlots single-cluster))))
+ (is (= "Running with fewer slots than requested (4/5)" (.get (.getStatusMap single-cluster) "topology1")))
+ )
+
+ (let [cluster (Cluster. (nimbus/standalone-nimbus) supers (.getAssignments single-cluster))
+ node-map (Node/getAllNodesFrom cluster)
+ free-pool (FreePool. )
+ default-pool (DefaultPool. )]
+ (.init free-pool cluster node-map)
+ (.init default-pool cluster node-map)
+ (.addTopology default-pool topology1)
+ (.scheduleAsNeeded default-pool (into-array NodePool [free-pool]))
+ ;; The cluster should now have 5 slots used
+ (is (= 5 (.size (.getUsedSlots cluster))))
+ (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
+ )
+))
+
+
+(deftest test-default-pool-complex
+ (let [supers (gen-supervisors 5)
+ cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+ node-map (Node/getAllNodesFrom cluster)
+ free-pool (FreePool. )
+ default-pool (DefaultPool. )
+ executor1 (ed 1)
+ executor2 (ed 2)
+ executor3 (ed 3)
+ executor11 (ed 11)
+ executor12 (ed 12)
+ executor13 (ed 13)
+ executor14 (ed 14)
+ topology1 (TopologyDetails. "topology1"
+ {TOPOLOGY-NAME "topology-name-1"}
+ (StormTopology.)
+ 2
+ {executor1 "spout1"
+ executor2 "bolt1"
+ executor3 "bolt2"})
+ topology2 (TopologyDetails. "topology2"
+ {TOPOLOGY-NAME "topology-name-2"}
+ (StormTopology.)
+ 4
+ {executor11 "spout11"
+ executor12 "bolt12"
+ executor13 "bolt13"
+ executor14 "bolt14"})]
+ ;; assign one node so it is not in the pool
+ (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
+ (.init free-pool cluster node-map)
+ (.init default-pool cluster node-map)
+ (is (= true (.canAdd default-pool topology1)))
+ (.addTopology default-pool topology1)
+ (is (= true (.canAdd default-pool topology2)))
+ (.addTopology default-pool topology2)
+ ;;Only 1 node is in the default-pool because only one nodes was scheduled already
+ (is (= 4 (.slotsAvailable default-pool)))
+ (is (= 1 (.nodesAvailable default-pool)))
+ (is (= (* 4 4) (.slotsAvailable free-pool)))
+ (is (= 4 (.nodesAvailable free-pool)))
+ (is (= 1 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+ (is (= nil (.getAssignmentById cluster "topology2")))
+ (.scheduleAsNeeded default-pool (into-array NodePool [free-pool]))
+ ;;We steal a node from the free pool to handle the extra
+ (is (= 8 (.slotsAvailable default-pool)))
+ (is (= 2 (.nodesAvailable default-pool)))
+ (is (= (* 3 4) (.slotsAvailable free-pool)))
+ (is (= 3 (.nodesAvailable free-pool)))
+ (is (= 2 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+ (is (= 4 (.size (.getSlots (.getAssignmentById cluster "topology2")))))
+ (let [ns-count-1 (.getNodeAndSlotCountIfSlotsWereTaken default-pool 1)
+ ns-count-3 (.getNodeAndSlotCountIfSlotsWereTaken default-pool 3)
+ ns-count-4 (.getNodeAndSlotCountIfSlotsWereTaken default-pool 4)
+ ns-count-5 (.getNodeAndSlotCountIfSlotsWereTaken default-pool 5)]
+ (is (= 1 (._nodes ns-count-1)))
+ (is (= 4 (._slots ns-count-1)))
+ (is (= 1 (._nodes ns-count-3)))
+ (is (= 4 (._slots ns-count-3)))
+ (is (= 1 (._nodes ns-count-4)))
+ (is (= 4 (._slots ns-count-4)))
+ (is (= 2 (._nodes ns-count-5)))
+ (is (= 8 (._slots ns-count-5)))
+ )
+ (let [nodes (.takeNodesBySlots default-pool 3)]
+ (is (= 1 (.size nodes)))
+ (is (= 4 (Node/countFreeSlotsAlive nodes)))
+ (is (= 4 (Node/countTotalSlotsAlive nodes)))
+ (is (= 1 (.nodesAvailable default-pool)))
+ (is (= (* 1 4) (.slotsAvailable default-pool)))
+ )
+ (let [nodes (.takeNodes default-pool 3)] ;;Only 1 should be left
+ (is (= 1 (.size nodes)))
+ (is (= 4 (Node/countFreeSlotsAlive nodes)))
+ (is (= 4 (Node/countTotalSlotsAlive nodes)))
+ (is (= 0 (.nodesAvailable default-pool)))
+ (is (= 0 (.slotsAvailable default-pool)))
+ )
+ (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
+ (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")))
+))
+
+(deftest test-isolated-pool-simple
+ (let [supers (gen-supervisors 5)
+ cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+ node-map (Node/getAllNodesFrom cluster)
+ free-pool (FreePool. )
+ isolated-pool (IsolatedPool. 5)
+ executor1 (ed 1)
+ executor2 (ed 2)
+ executor3 (ed 3)
+ executor4 (ed 4)
+ topology1 (TopologyDetails. "topology1"
+ {TOPOLOGY-NAME "topology-name-1"
+ TOPOLOGY-ISOLATED-MACHINES 4}
+ (StormTopology.)
+ 4
+ {executor1 "spout1"
+ executor2 "bolt1"
+ executor3 "bolt2"
+ executor4 "bolt4"})]
+ ;; assign one node so it is not in the pool
+ (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
+ (.init free-pool cluster node-map)
+ (.init isolated-pool cluster node-map)
+ (is (= true (.canAdd isolated-pool topology1)))
+ (.addTopology isolated-pool topology1)
+ ;;Isolated topologies cannot have their resources stolen
+ (is (= 0 (.slotsAvailable isolated-pool)))
+ (is (= 0 (.nodesAvailable isolated-pool)))
+ (is (= (* 4 4) (.slotsAvailable free-pool)))
+ (is (= 4 (.nodesAvailable free-pool)))
+ (is (= 1 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+ (.scheduleAsNeeded isolated-pool (into-array NodePool [free-pool]))
+ (is (= 0 (.slotsAvailable isolated-pool)))
+ (is (= 0 (.nodesAvailable isolated-pool)))
+ (is (= (* 1 4) (.slotsAvailable free-pool)))
+ (is (= 1 (.nodesAvailable free-pool)))
+ (let [assigned-slots (.getSlots (.getAssignmentById cluster "topology1"))]
+ ;; 4 slots on 4 machines
+ (is (= 4 (.size assigned-slots)))
+ (is (= 4 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+ )
+ (is (= "Scheduled Isolated on 4 Nodes" (.get (.getStatusMap cluster) "topology1")))
+))
+
+(deftest test-isolated-pool-big-ask
+ (let [supers (gen-supervisors 5)
+ cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+ node-map (Node/getAllNodesFrom cluster)
+ free-pool (FreePool. )
+ isolated-pool (IsolatedPool. 5)
+ executor1 (ed 1)
+ executor2 (ed 2)
+ executor3 (ed 3)
+ executor4 (ed 4)
+ topology1 (TopologyDetails. "topology1"
+ {TOPOLOGY-NAME "topology-name-1"
+ TOPOLOGY-ISOLATED-MACHINES 4}
+ (StormTopology.)
+ 10
+ {executor1 "spout1"
+ executor2 "bolt1"
+ executor3 "bolt2"
+ executor4 "bolt4"})]
+ ;; assign one node so it is not in the pool
+ (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
+ (.init free-pool cluster node-map)
+ (.init isolated-pool cluster node-map)
+ (is (= true (.canAdd isolated-pool topology1)))
+ (.addTopology isolated-pool topology1)
+ ;;Isolated topologies cannot have their resources stolen
+ (is (= 0 (.slotsAvailable isolated-pool)))
+ (is (= 0 (.nodesAvailable isolated-pool)))
+ (is (= (* 4 4) (.slotsAvailable free-pool)))
+ (is (= 4 (.nodesAvailable free-pool)))
+ (is (= 1 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+ (.scheduleAsNeeded isolated-pool (into-array NodePool [free-pool]))
+ (is (= 0 (.slotsAvailable isolated-pool)))
+ (is (= 0 (.nodesAvailable isolated-pool)))
+ (is (= (* 1 4) (.slotsAvailable free-pool)))
+ (is (= 1 (.nodesAvailable free-pool)))
+ (let [assigned-slots (.getSlots (.getAssignmentById cluster "topology1"))]
+ ;; 4 slots on 4 machines
+ (is (= 4 (.size assigned-slots)))
+ (is (= 4 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+ )
+ (is (= "Scheduled Isolated on 4 Nodes" (.get (.getStatusMap cluster) "topology1")))
+))
+
+(deftest test-isolated-pool-complex
+ (let [supers (gen-supervisors 5)
+ cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+ node-map (Node/getAllNodesFrom cluster)
+ free-pool (FreePool. )
+ isolated-pool (IsolatedPool. 5)
+ executor1 (ed 1)
+ executor2 (ed 2)
+ executor3 (ed 3)
+ executor4 (ed 4)
+ executor11 (ed 11)
+ executor12 (ed 12)
+ executor13 (ed 13)
+ executor14 (ed 14)
+ topology1 (TopologyDetails. "topology1"
+ {TOPOLOGY-NAME "topology-name-1"}
+ (StormTopology.)
+ 4
+ {executor1 "spout1"
+ executor2 "bolt1"
+ executor3 "bolt2"
+ executor4 "bolt4"})
+ topology2 (TopologyDetails. "topology2"
+ {TOPOLOGY-NAME "topology-name-2"
+ TOPOLOGY-ISOLATED-MACHINES 2}
+ (StormTopology.)
+ 4
+ {executor11 "spout11"
+ executor12 "bolt12"
+ executor13 "bolt13"
+ executor14 "bolt14"})]
+ ;; assign one node so it is not in the pool
+ (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
+ (.init free-pool cluster node-map)
+ (.init isolated-pool cluster node-map)
+ (is (= true (.canAdd isolated-pool topology1)))
+ (.addTopology isolated-pool topology1)
+ (is (= true (.canAdd isolated-pool topology2)))
+ (.addTopology isolated-pool topology2)
+ ;; nodes can be stolen from non-isolted tops in the pool
+ (is (= 4 (.slotsAvailable isolated-pool)))
+ (is (= 1 (.nodesAvailable isolated-pool)))
+ (is (= (* 4 4) (.slotsAvailable free-pool)))
+ (is (= 4 (.nodesAvailable free-pool)))
+ (is (= 1 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+ (is (= nil (.getAssignmentById cluster "topology2")))
+ (.scheduleAsNeeded isolated-pool (into-array NodePool [free-pool]))
+ ;;We steal 2 nodes from the free pool to handle the extra (but still only 1 node for the non-isolated top
+ (is (= 4 (.slotsAvailable isolated-pool)))
+ (is (= 1 (.nodesAvailable isolated-pool)))
+ (is (= (* 2 4) (.slotsAvailable free-pool)))
+ (is (= 2 (.nodesAvailable free-pool)))
+ (let [assigned-slots (.getSlots (.getAssignmentById cluster "topology1"))]
+ ;; 4 slots on 1 machine
+ (is (= 4 (.size assigned-slots)))
+ (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+ )
+ (let [assigned-slots (.getSlots (.getAssignmentById cluster "topology2"))]
+ ;; 4 slots on 2 machines
+ (is (= 4 (.size assigned-slots)))
+ (is (= 2 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+ )
+
+ (let [ns-count-1 (.getNodeAndSlotCountIfSlotsWereTaken isolated-pool 1)
+ ns-count-3 (.getNodeAndSlotCountIfSlotsWereTaken isolated-pool 3)
+ ns-count-4 (.getNodeAndSlotCountIfSlotsWereTaken isolated-pool 4)
+ ns-count-5 (.getNodeAndSlotCountIfSlotsWereTaken isolated-pool 5)]
+ (is (= 1 (._nodes ns-count-1)))
+ (is (= 4 (._slots ns-count-1)))
+ (is (= 1 (._nodes ns-count-3)))
+ (is (= 4 (._slots ns-count-3)))
+ (is (= 1 (._nodes ns-count-4)))
+ (is (= 4 (._slots ns-count-4)))
+ (is (= 1 (._nodes ns-count-5))) ;;Only 1 node can be stolen right now
+ (is (= 4 (._slots ns-count-5)))
+ )
+ (let [nodes (.takeNodesBySlots isolated-pool 3)]
+ (is (= 1 (.size nodes)))
+ (is (= 4 (Node/countFreeSlotsAlive nodes)))
+ (is (= 4 (Node/countTotalSlotsAlive nodes)))
+ (is (= 0 (.nodesAvailable isolated-pool)))
+ (is (= (* 0 4) (.slotsAvailable isolated-pool)))
+ )
+ (let [assigned-slots (.getSlots (.getAssignmentById cluster "topology1"))]
+ ;; 4 slots on 1 machine
+ (is (= 0 (.size assigned-slots)))
+ (is (= 0 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+ )
+ (let [assigned-slots (.getSlots (.getAssignmentById cluster "topology2"))]
+ ;; 4 slots on 2 machines
+ (is (= 4 (.size assigned-slots)))
+ (is (= 2 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+ )
+ (let [nodes (.takeNodes isolated-pool 3)] ;;Cannot steal from the isolated scheduler
+ (is (= 0 (.size nodes)))
+ (is (= 0 (Node/countFreeSlotsAlive nodes)))
+ (is (= 0 (Node/countTotalSlotsAlive nodes)))
+ (is (= 0 (.nodesAvailable isolated-pool)))
+ (is (= 0 (.slotsAvailable isolated-pool)))
+ )
+ (is (= "Scheduled Isolated on 1 Nodes" (.get (.getStatusMap cluster) "topology1")))
+ (is (= "Scheduled Isolated on 2 Nodes" (.get (.getStatusMap cluster) "topology2")))
+))
+
+(deftest test-isolated-pool-complex-2
+ (let [supers (gen-supervisors 5)
+ cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+ node-map (Node/getAllNodesFrom cluster)
+ free-pool (FreePool. )
+ ;;like before but now we can only hold 2 nodes max. Don't go over
+ isolated-pool (IsolatedPool. 2)
+ executor1 (ed 1)
+ executor2 (ed 2)
+ executor3 (ed 3)
+ executor4 (ed 4)
+ executor11 (ed 11)
+ executor12 (ed 12)
+ executor13 (ed 13)
+ executor14 (ed 14)
+ topology1 (TopologyDetails. "topology1"
+ {TOPOLOGY-NAME "topology-name-1"}
+ (StormTopology.)
+ 4
+ {executor1 "spout1"
+ executor2 "bolt1"
+ executor3 "bolt2"
+ executor4 "bolt4"})
+ topology2 (TopologyDetails. "topology2"
+ {TOPOLOGY-NAME "topology-name-2"
+ TOPOLOGY-ISOLATED-MACHINES 2}
+ (StormTopology.)
+ 4
+ {executor11 "spout11"
+ executor12 "bolt12"
+ executor13 "bolt13"
+ executor14 "bolt14"})]
+ ;; assign one node so it is not in the pool
+ (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
+ (.init free-pool cluster node-map)
+ (.init isolated-pool cluster node-map)
+ (is (= true (.canAdd isolated-pool topology1)))
+ (.addTopology isolated-pool topology1)
+ (is (= true (.canAdd isolated-pool topology2)))
+ (.addTopology isolated-pool topology2)
+ ;; nodes can be stolen from non-isolted tops in the pool
+ (is (= 4 (.slotsAvailable isolated-pool)))
+ (is (= 1 (.nodesAvailable isolated-pool)))
+ (is (= (* 4 4) (.slotsAvailable free-pool)))
+ (is (= 4 (.nodesAvailable free-pool)))
+ (is (= 1 (.size (.getSlots (.getAssignmentById cluster "topology1")))))
+ (is (= nil (.getAssignmentById cluster "topology2")))
+ (.scheduleAsNeeded isolated-pool (into-array NodePool [free-pool]))
+ ;;We steal 1 node from the free pool and 1 from ourself to handle the extra
+ (is (= 0 (.slotsAvailable isolated-pool)))
+ (is (= 0 (.nodesAvailable isolated-pool)))
+ (is (= (* 3 4) (.slotsAvailable free-pool)))
+ (is (= 3 (.nodesAvailable free-pool)))
+ (let [assigned-slots (.getSlots (.getAssignmentById cluster "topology1"))]
+ ;; 0 slots on 0 machine
+ (is (= 0 (.size assigned-slots)))
+ (is (= 0 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+ )
+ (let [assigned-slots (.getSlots (.getAssignmentById cluster "topology2"))]
+ ;; 4 slots on 2 machines
+ (is (= 4 (.size assigned-slots)))
+ (is (= 2 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+ )
+ (is (= "Max Nodes(2) for this user would be exceeded. 1 more nodes needed to run topology." (.get (.getStatusMap cluster) "topology1")))
+ (is (= "Scheduled Isolated on 2 Nodes" (.get (.getStatusMap cluster) "topology2")))
+))
+
+(deftest test-multitenant-scheduler
+ (let [supers (gen-supervisors 10)
+ topology1 (TopologyDetails. "topology1"
+ {TOPOLOGY-NAME "topology-name-1"
+ TOPOLOGY-SUBMITTER-USER "userC"}
+ (StormTopology.)
+ 4
+ (mk-ed-map [["spout1" 0 5]
+ ["bolt1" 5 10]
+ ["bolt2" 10 15]
+ ["bolt3" 15 20]]))
+ topology2 (TopologyDetails. "topology2"
+ {TOPOLOGY-NAME "topology-name-2"
+ TOPOLOGY-ISOLATED-MACHINES 2
+ TOPOLOGY-SUBMITTER-USER "userA"}
+ (StormTopology.)
+ 4
+ (mk-ed-map [["spout11" 0 5]
+ ["bolt12" 5 6]
+ ["bolt13" 6 7]
+ ["bolt14" 7 10]]))
+ topology3 (TopologyDetails. "topology3"
+ {TOPOLOGY-NAME "topology-name-3"
+ TOPOLOGY-ISOLATED-MACHINES 5
+ TOPOLOGY-SUBMITTER-USER "userB"}
+ (StormTopology.)
+ 10
+ (mk-ed-map [["spout21" 0 10]
+ ["bolt22" 10 20]
+ ["bolt23" 20 30]
+ ["bolt24" 30 40]]))
+ cluster (Cluster. (nimbus/standalone-nimbus) supers {})
+ node-map (Node/getAllNodesFrom cluster)
+ topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
+ conf {MULTITENANT-SCHEDULER-USER-POOLS {"userA" 5 "userB" 5}}
+ scheduler (MultitenantScheduler.)]
+ (.assign (.get node-map "super0") "topology1" (list (ed 1)) cluster)
+ (.assign (.get node-map "super1") "topology2" (list (ed 5)) cluster)
+ (.prepare scheduler conf)
+ (.schedule scheduler topologies cluster)
+ (let [assignment (.getAssignmentById cluster "topology1")
+ assigned-slots (.getSlots assignment)
+ executors (.getExecutors assignment)]
+ ;; 4 slots on 1 machine, all executors assigned
+ (is (= 4 (.size assigned-slots)))
+ (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+ (is (= 20 (.size executors)))
+ )
+ (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
+ (is (= "Scheduled Isolated on 2 Nodes" (.get (.getStatusMap cluster) "topology2")))
+ (is (= "Scheduled Isolated on 5 Nodes" (.get (.getStatusMap cluster) "topology3")))
+))
+
+
+(deftest test-multitenant-scheduler-bad-starting-state
+ (let [supers (gen-supervisors 10)
+ topology1 (TopologyDetails. "topology1"
+ {TOPOLOGY-NAME "topology-name-1"
+ TOPOLOGY-SUBMITTER-USER "userC"}
+ (StormTopology.)
+ 4
+ (mk-ed-map [["spout1" 0 5]
+ ["bolt1" 5 10]
+ ["bolt2" 10 15]
+ ["bolt3" 15 20]]))
+ topology2 (TopologyDetails. "topology2"
+ {TOPOLOGY-NAME "topology-name-2"
+ TOPOLOGY-ISOLATED-MACHINES 2
+ TOPOLOGY-SUBMITTER-USER "userA"}
+ (StormTopology.)
+ 4
+ (mk-ed-map [["spout11" 0 5]
+ ["bolt12" 5 6]
+ ["bolt13" 6 7]
+ ["bolt14" 7 10]]))
+ topology3 (TopologyDetails. "topology3"
+ {TOPOLOGY-NAME "topology-name-3"
+ TOPOLOGY-ISOLATED-MACHINES 5
+ TOPOLOGY-SUBMITTER-USER "userB"}
+ (StormTopology.)
+ 10
+ (mk-ed-map [["spout21" 0 10]
+ ["bolt22" 10 20]
+ ["bolt23" 20 30]
+ ["bolt24" 30 40]]))
+ existing-assignments {
+ "topology2" (SchedulerAssignmentImpl. "topology2" {(ExecutorDetails. 0 5) (WorkerSlot. "super1" 1)})
+ "topology3" (SchedulerAssignmentImpl. "topology3" {(ExecutorDetails. 0 10) (WorkerSlot. "super1" 1)})
+ }
+ cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments)
+ topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
+ conf {MULTITENANT-SCHEDULER-USER-POOLS {"userA" 5 "userB" 5}}
+ scheduler (MultitenantScheduler.)]
+ (.prepare scheduler conf)
+ (.schedule scheduler topologies cluster)
+ (let [assignment (.getAssignmentById cluster "topology1")
+ assigned-slots (.getSlots assignment)
+ executors (.getExecutors assignment)]
+ ;; 4 slots on 1 machine, all executors assigned
+ (is (= 4 (.size assigned-slots)))
+ (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+ (is (= 20 (.size executors)))
+ )
+ (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
+ (is (= "Scheduled Isolated on 2 Nodes" (.get (.getStatusMap cluster) "topology2")))
+ (is (= "Scheduled Isolated on 5 Nodes" (.get (.getStatusMap cluster) "topology3")))
+))
+
+
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/AuthUtils_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/AuthUtils_test.clj b/storm-core/test/clj/backtype/storm/security/auth/AuthUtils_test.clj
index ed21904..2d96b18 100644
--- a/storm-core/test/clj/backtype/storm/security/auth/AuthUtils_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/AuthUtils_test.clj
@@ -14,13 +14,16 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.security.auth.AuthUtils-test
- (:import [backtype.storm.security.auth AuthUtils])
+ (:import [backtype.storm.security.auth AuthUtils IAutoCredentials])
(:import [java.io IOException])
(:import [javax.security.auth.login AppConfigurationEntry Configuration])
(:import [org.mockito Mockito])
(:use [clojure test])
+ (:use [backtype.storm bootstrap])
)
+(bootstrap)
+
(deftest test-throws-on-missing-section
(is (thrown? IOException
(AuthUtils/get (Mockito/mock Configuration) "bogus-section" "")))
@@ -61,5 +64,16 @@
(is (not (nil? (AuthUtils/get conf section k))))
(is (= (AuthUtils/get conf section k) expected))
)
+ ))
+
+(deftest test-empty-auto-creds
+ (let [result (AuthUtils/GetAutoCredentials {})]
+ (is (.isEmpty result))
+ )
+)
+
+(deftest test-empty-creds-renewers
+ (let [result (AuthUtils/GetCredentialRenewers {})]
+ (is (.isEmpty result))
)
)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/DefaultHttpCredentialsPlugin_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/DefaultHttpCredentialsPlugin_test.clj b/storm-core/test/clj/backtype/storm/security/auth/DefaultHttpCredentialsPlugin_test.clj
new file mode 100644
index 0000000..ab54d82
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/security/auth/DefaultHttpCredentialsPlugin_test.clj
@@ -0,0 +1,40 @@
+(ns backtype.storm.security.auth.DefaultHttpCredentialsPlugin-test
+ (:use [clojure test])
+ (:import [javax.security.auth Subject])
+ (:import [javax.servlet.http HttpServletRequest])
+ (:import [org.mockito Mockito])
+ (:import [backtype.storm.security.auth DefaultHttpCredentialsPlugin
+ ReqContext SingleUserPrincipal])
+ )
+
+(deftest test-getUserName
+ (let [handler (doto (DefaultHttpCredentialsPlugin.) (.prepare {}))]
+ (testing "returns null when request is null"
+ (is (nil? (.getUserName handler nil))))
+
+ (testing "returns null when user principal is null"
+ (let [req (Mockito/mock HttpServletRequest)]
+ (is (nil? (.getUserName handler req)))))
+
+ (testing "returns null when user is blank"
+ (let [princ (SingleUserPrincipal. "")
+ req (Mockito/mock HttpServletRequest)]
+ (. (Mockito/when (. req getUserPrincipal))
+ thenReturn princ)
+ (is (nil? (.getUserName handler req)))))
+
+ (testing "returns correct user from requests principal"
+ (let [exp-name "Alice"
+ princ (SingleUserPrincipal. exp-name)
+ req (Mockito/mock HttpServletRequest)]
+ (. (Mockito/when (. req getUserPrincipal))
+ thenReturn princ)
+ (is (.equals exp-name (.getUserName handler req)))))))
+
+(deftest test-populate-req-context-noop-on-null-user
+ (let [req (Mockito/mock HttpServletRequest)
+ handler (doto (DefaultHttpCredentialsPlugin.) (.prepare {}))
+ expected-subj (Subject.)
+ context (ReqContext. expected-subj)]
+ (is (.equals expected-subj
+ (-> handler (.populateContext context req) (.subject))))))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/ThriftClient_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/ThriftClient_test.clj b/storm-core/test/clj/backtype/storm/security/auth/ThriftClient_test.clj
index 7dcd86d..2eee963 100644
--- a/storm-core/test/clj/backtype/storm/security/auth/ThriftClient_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/ThriftClient_test.clj
@@ -14,28 +14,32 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.security.auth.ThriftClient-test
- (:use [backtype.storm config])
+ (:use [backtype.storm config util])
(:use [clojure test])
- (:import [backtype.storm.security.auth ThriftClient])
+ (:import [backtype.storm.security.auth ThriftClient ThriftConnectionType])
(:import [org.apache.thrift.transport TTransportException])
)
(deftest test-ctor-throws-if-port-invalid
- (let [conf (read-default-config)
+ (let [conf (merge
+ (read-default-config)
+ {STORM-NIMBUS-RETRY-TIMES 0})
timeout (Integer. 30)]
- (is (thrown? java.lang.IllegalArgumentException
- (ThriftClient. conf "bogushost" -1 timeout)))
- (is (thrown? java.lang.IllegalArgumentException
- (ThriftClient. conf "bogushost" 0 timeout)))
+ (is (thrown-cause? java.lang.IllegalArgumentException
+ (ThriftClient. conf ThriftConnectionType/DRPC "bogushost" (int -1) timeout)))
+ (is (thrown-cause? java.lang.IllegalArgumentException
+ (ThriftClient. conf ThriftConnectionType/DRPC "bogushost" (int 0) timeout)))
)
)
(deftest test-ctor-throws-if-host-not-set
- (let [conf (read-default-config)
+ (let [conf (merge
+ (read-default-config)
+ {STORM-NIMBUS-RETRY-TIMES 0})
timeout (Integer. 60)]
- (is (thrown? TTransportException
- (ThriftClient. conf "" 4242 timeout)))
- (is (thrown? IllegalArgumentException
- (ThriftClient. conf nil 4242 timeout)))
+ (is (thrown-cause? TTransportException
+ (ThriftClient. conf ThriftConnectionType/DRPC "" (int 4242) timeout)))
+ (is (thrown-cause? IllegalArgumentException
+ (ThriftClient. conf ThriftConnectionType/DRPC nil (int 4242) timeout)))
)
)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/test/clj/backtype/storm/security/auth/ThriftServer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/ThriftServer_test.clj b/storm-core/test/clj/backtype/storm/security/auth/ThriftServer_test.clj
index 6213d4f..c8ed70e 100644
--- a/storm-core/test/clj/backtype/storm/security/auth/ThriftServer_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/ThriftServer_test.clj
@@ -16,14 +16,16 @@
(ns backtype.storm.security.auth.ThriftServer-test
(:use [backtype.storm config])
(:use [clojure test])
- (:import [backtype.storm.security.auth ThriftServer])
+ (:import [backtype.storm.security.auth ThriftServer ThriftConnectionType])
(:import [org.apache.thrift.transport TTransportException])
)
(deftest test-stop-checks-for-null
- (let [server (ThriftServer. (read-default-config) nil 12345)]
+ (let [server (ThriftServer. (read-default-config) nil
+ ThriftConnectionType/DRPC)]
(.stop server)))
(deftest test-isServing-checks-for-null
- (let [server (ThriftServer. (read-default-config) nil 12345)]
+ (let [server (ThriftServer. (read-default-config) nil
+ ThriftConnectionType/DRPC)]
(is (not (.isServing server)))))