You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/09/19 21:01:31 UTC

[2/8] storm git commit: STORM-2018: Supervisor V2

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index d85b06b..a80d3d2 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -85,7 +85,6 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.FileWriter;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -153,7 +152,7 @@ public class Utils {
         return oldInstance;
     }
 
-    private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+    public static final Logger LOG = LoggerFactory.getLogger(Utils.class);
     public static final String DEFAULT_STREAM_ID = "default";
     public static final String DEFAULT_BLOB_VERSION_SUFFIX = ".version";
     public static final String CURRENT_BLOB_SUFFIX_ID = "current";
@@ -172,10 +171,11 @@ public class Utils {
     public static final int SIGTERM = 15;
 
     static {
-        Map conf = readStormConfig();
+        Map<String, Object> conf = readStormConfig();
         serializationDelegate = getSerializationDelegate(conf);
     }
 
+    @SuppressWarnings("unchecked")
     public static <T> T newInstance(String klass) {
         try {
             return newInstance((Class<T>)Class.forName(klass));
@@ -213,9 +213,9 @@ public class Utils {
         return serializationDelegate.deserialize(serialized, clazz);
     }
 
-    public static <T> T thriftDeserialize(Class c, byte[] b, int offset, int length) {
+    public static <T> T thriftDeserialize(Class<T> c, byte[] b, int offset, int length) {
         try {
-            T ret = (T) c.newInstance();
+            T ret = c.newInstance();
             TDeserializer des = getDes();
             des.deserialize((TBase) ret, b, offset, length);
             return ret;
@@ -353,16 +353,17 @@ public class Utils {
         }
     }
 
-    public static Map findAndReadConfigFile(String name, boolean mustExist) {
+    public static Map<String, Object> findAndReadConfigFile(String name, boolean mustExist) {
         InputStream in = null;
         boolean confFileEmpty = false;
         try {
             in = getConfigFileInputStream(name);
             if (null != in) {
                 Yaml yaml = new Yaml(new SafeConstructor());
-                Map ret = (Map) yaml.load(new InputStreamReader(in));
+                @SuppressWarnings("unchecked")
+                Map<String, Object> ret = (Map<String, Object>) yaml.load(new InputStreamReader(in));
                 if (null != ret) {
-                    return new HashMap(ret);
+                    return new HashMap<>(ret);
                 } else {
                     confFileEmpty = true;
                 }
@@ -374,7 +375,7 @@ public class Utils {
                 else
                     throw new RuntimeException("Could not find config file on classpath " + name);
             } else {
-                return new HashMap();
+                return new HashMap<>();
             }
         } catch (IOException e) {
             throw new RuntimeException(e);
@@ -416,16 +417,16 @@ public class Utils {
     }
 
 
-    public static Map findAndReadConfigFile(String name) {
+    public static Map<String, Object> findAndReadConfigFile(String name) {
         return findAndReadConfigFile(name, true);
     }
 
-    public static Map readDefaultConfig() {
+    public static Map<String, Object> readDefaultConfig() {
         return findAndReadConfigFile("defaults.yaml", true);
     }
 
-    public static Map readCommandLineOpts() {
-        Map ret = new HashMap();
+    public static Map<String, Object> readCommandLineOpts() {
+        Map<String, Object> ret = new HashMap<>();
         String commandOptions = System.getProperty("storm.options");
         if (commandOptions != null) {
             /*
@@ -456,10 +457,10 @@ public class Utils {
         return ret;
     }
 
-    public static Map readStormConfig() {
-        Map ret = readDefaultConfig();
+    public static Map<String, Object> readStormConfig() {
+        Map<String, Object> ret = readDefaultConfig();
         String confFile = System.getProperty("storm.conf.file");
-        Map storm;
+        Map<String, Object> storm;
         if (confFile == null || confFile.equals("")) {
             storm = findAndReadConfigFile("storm.yaml", false);
         } else {
@@ -568,6 +569,11 @@ public class Utils {
      */
     public static void downloadResourcesAsSupervisor(String key, String localFile,
                                                      ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException, IOException {
+        _instance.downloadResourcesAsSupervisorImpl(key, localFile, cb);
+    }
+
+    public void downloadResourcesAsSupervisorImpl(String key, String localFile,
+            ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException, IOException {
         final int MAX_RETRY_ATTEMPTS = 2;
         final int ATTEMPTS_INTERVAL_TIME = 100;
         for (int retryAttempts = 0; retryAttempts < MAX_RETRY_ATTEMPTS; retryAttempts++) {
@@ -586,11 +592,8 @@ public class Utils {
 
     private static boolean downloadResourcesAsSupervisorAttempt(ClientBlobStore cb, String key, String localFile) {
         boolean isSuccess = false;
-        FileOutputStream out = null;
-        InputStreamWithMeta in = null;
-        try {
-            out = new FileOutputStream(localFile);
-            in = cb.getBlob(key);
+        try (FileOutputStream out = new FileOutputStream(localFile);
+                InputStreamWithMeta in = cb.getBlob(key);) {
             long fileSize = in.getFileLength();
 
             byte[] buffer = new byte[1024];
@@ -604,17 +607,6 @@ public class Utils {
             isSuccess = (fileSize == downloadFileSize);
         } catch (TException | IOException e) {
             LOG.error("An exception happened while downloading {} from blob store.", localFile, e);
-        } finally {
-            try {
-                if (out != null) {
-                    out.close();
-                }
-            } catch (IOException ignored) {}
-            try {
-                if (in != null) {
-                    in.close();
-                }
-            } catch (IOException ignored) {}
         }
         if (!isSuccess) {
             try {
@@ -626,6 +618,10 @@ public class Utils {
         return isSuccess;
     }
 
+    public static boolean checkFileExists(File path) {
+        return Files.exists(path.toPath());
+    }
+    
     public static boolean checkFileExists(String path) {
         return Files.exists(new File(path).toPath());
     }
@@ -781,7 +777,7 @@ public class Utils {
         }
     }
 
-    public static <T> T thriftDeserialize(Class c, byte[] b) {
+    public static <T> T thriftDeserialize(Class<T> c, byte[] b) {
         try {
             return Utils.thriftDeserialize(c, b, 0, b.length);
         } catch (Exception e) {
@@ -1872,9 +1868,12 @@ public class Utils {
      * @param jarpath Path to the jar file
      * @param dir Directory in the jar to pull out
      * @param destdir Path to the directory where the extracted directory will be put
-     *
      */
-    public static void extractDirFromJar(String jarpath, String dir, String destdir) {
+    public static void extractDirFromJar(String jarpath, String dir, File destdir) {
+        _instance.extractDirFromJarImpl(jarpath, dir, destdir);
+    }
+    
+    public void extractDirFromJarImpl(String jarpath, String dir, File destdir) {
         try (JarFile jarFile = new JarFile(jarpath)) {
             Enumeration<JarEntry> jarEnums = jarFile.entries();
             while (jarEnums.hasMoreElements()) {
@@ -1990,34 +1989,6 @@ public class Utils {
     }
 
     /**
-     * Creates a symbolic link to the target
-     * @param dir the parent directory of the link
-     * @param targetDir the parent directory of the link's target
-     * @param filename the file name of the link
-     * @param targetFilename the file name of the links target
-     * @throws IOException
-     */
-    public static void createSymlink(String dir, String targetDir,
-            String filename, String targetFilename) throws IOException {
-        Path path = Paths.get(dir, filename).toAbsolutePath();
-        Path target = Paths.get(targetDir, targetFilename).toAbsolutePath();
-        LOG.debug("Creating symlink [{}] to [{}]", path, target);
-        if (!path.toFile().exists()) {
-            Files.createSymbolicLink(path, target);
-        }
-    }
-
-    /**
-     * Convenience method for the case when the link's file name should be the
-     * same as the file name of the target
-     */
-    public static void createSymlink(String dir, String targetDir,
-                                     String targetFilename) throws IOException {
-        Utils.createSymlink(dir, targetDir, targetFilename,
-                            targetFilename);
-    }
-
-    /**
      * Returns a Collection of file names found under the given directory.
      * @param dir a directory
      * @return the Collection of file names
@@ -2047,56 +2018,6 @@ public class Utils {
         return System.getProperty("java.class.path");
     }
 
-    /**
-     * Returns a collection of jar file names found under the given directory.
-     * @param dir the directory to search
-     * @return the jar file names
-     */
-    private static List<String> getFullJars(String dir) {
-        File[] files = new File(dir).listFiles(jarFilter);
-
-        if(files == null) {
-            return new ArrayList<>();
-        }
-
-        List<String> ret = new ArrayList<>(files.length);
-        for (File f : files) {
-            ret.add(Paths.get(dir, f.getName()).toString());
-        }
-        return ret;
-    }
-    private static final FilenameFilter jarFilter = new FilenameFilter() {
-            @Override
-            public boolean accept(File dir, String name) {
-                return name.endsWith(".jar");
-            }
-        };
-
-
-    public static String workerClasspath() {
-        String stormDir = System.getProperty("storm.home");
-
-        if (stormDir == null) {
-            return Utils.currentClasspath();
-        }
-
-        String stormLibDir = Paths.get(stormDir, "lib").toString();
-        String stormConfDir =
-                System.getenv("STORM_CONF_DIR") != null ?
-                System.getenv("STORM_CONF_DIR") :
-                Paths.get(stormDir, "conf").toString();
-        String stormExtlibDir = Paths.get(stormDir, "extlib").toString();
-        String extcp = System.getenv("STORM_EXT_CLASSPATH");
-        List<String> pathElements = new LinkedList<>();
-        pathElements.addAll(Utils.getFullJars(stormLibDir));
-        pathElements.addAll(Utils.getFullJars(stormExtlibDir));
-        pathElements.add(extcp);
-        pathElements.add(stormConfDir);
-
-        return StringUtils.join(pathElements,
-                CLASS_PATH_SEPARATOR);
-    }
-
     public static String addToClasspath(String classpath,
                 Collection<String> paths) {
         return _instance.addToClasspathImpl(classpath, paths);
@@ -2281,78 +2202,6 @@ public class Utils {
                 null);
     }
 
-    /**
-     * A callback that can accept an integer.
-     * @param <V> the result type of method <code>call</code>
-     */
-    public interface ExitCodeCallable<V> extends Callable<V> {
-        V call(int exitCode);
-    }
-
-    /**
-     * Launch a new process as per {@link java.lang.ProcessBuilder} with a given
-     * callback.
-     * @param command the command to be executed in the new process
-     * @param environment the environment to be applied to the process. Can be
-     *                    null.
-     * @param logPrefix a prefix for log entries from the output of the process.
-     *                  Can be null.
-     * @param exitCodeCallback code to be called passing the exit code value
-     *                         when the process completes
-     * @param dir the working directory of the new process
-     * @return the new process
-     * @throws IOException
-     * @see java.lang.ProcessBuilder
-     */
-    public static Process launchProcess(List<String> command,
-                                        Map<String,String> environment,
-                                        final String logPrefix,
-                                        final ExitCodeCallable exitCodeCallback,
-                                        File dir)
-            throws IOException {
-        return _instance.launchProcessImpl(command, environment, logPrefix,
-                exitCodeCallback, dir);
-    }
-
-    public Process launchProcessImpl(
-            List<String> command,
-            Map<String,String> cmdEnv,
-            final String logPrefix,
-            final ExitCodeCallable exitCodeCallback,
-            File dir)
-            throws IOException {
-        ProcessBuilder builder = new ProcessBuilder(command);
-        Map<String,String> procEnv = builder.environment();
-        if (dir != null) {
-            builder.directory(dir);
-        }
-        builder.redirectErrorStream(true);
-        if (cmdEnv != null) {
-            procEnv.putAll(cmdEnv);
-        }
-        final Process process = builder.start();
-        if (logPrefix != null || exitCodeCallback != null) {
-            Utils.asyncLoop(new Callable() {
-                public Object call() {
-                    if (logPrefix != null ) {
-                        Utils.readAndLogStream(logPrefix,
-                                process.getInputStream());
-                    }
-                    if (exitCodeCallback != null) {
-                        try {
-                            process.waitFor();
-                        } catch (InterruptedException ie) {
-                            LOG.info("{} interrupted", logPrefix);
-                            exitCodeCallback.call(process.exitValue());
-                        }
-                    }
-                    return null; // Run only once.
-                }
-            });
-        }
-        return process;
-    }
-
     public static <T> List<T> interleaveAll(List<List<T>> nodeList) {
         if (nodeList != null && nodeList.size() > 0) {
             List<T> first = new ArrayList<T>();

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index 7723922..0580f41 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -36,6 +36,7 @@ import org.apache.storm.callback.DefaultWatcherCallBack;
 import org.apache.storm.callback.WatcherCallBack;
 import org.apache.storm.cluster.ClusterUtils;
 import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.VersionedData;
 import org.apache.storm.nimbus.ILeaderElector;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.utils.ConfigUtils;
@@ -388,8 +389,15 @@ public class Zookeeper {
             leaderLatchListenerAtomicReference, blobStore);
     }
 
-    public static Map getDataWithVersion(CuratorFramework zk, String path, boolean watch) {
-        Map map = new HashMap();
+    /**
+     * Get the data along with a version
+     * @param zk the zk instance to use
+     * @param path the path to get it from
+     * @param watch should a watch be enabled
+     * @return null if no data is found, else the data with the version.
+     */
+    public static VersionedData<byte[]> getDataWithVersion(CuratorFramework zk, String path, boolean watch) {
+        VersionedData<byte[]> data = null;
         try {
             byte[] bytes = null;
             Stat stats = new Stat();
@@ -402,8 +410,7 @@ public class Zookeeper {
                 }
                 if (bytes != null) {
                     int version = stats.getVersion();
-                    map.put(IStateStorage.DATA, bytes);
-                    map.put(IStateStorage.VERSION, version);
+                    data = new VersionedData<>(version, bytes);
                 }
             }
         } catch (Exception e) {
@@ -413,7 +420,7 @@ public class Zookeeper {
                 Utils.wrapInRuntime(e);
             }
         }
-        return map;
+        return data;
     }
 
     public static List<String> tokenizePath(String path) {

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index 787fbcd..90c2697 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -609,70 +609,3 @@
     (report-error! collector (RuntimeException.)))
   (ack! collector tuple))
 
-(deftest test-throttled-errors
-  (with-simulated-time
-    (with-tracked-cluster [cluster]
-      (let [state (:storm-cluster-state cluster)
-            [feeder checker] (ack-tracking-feeder ["num"])
-            tracked (mk-tracked-topology
-                     cluster
-                     (Thrift/buildTopology
-                       {"1" (Thrift/prepareSpoutDetails feeder)}
-                       {"2" (Thrift/prepareBoltDetails
-                              {(Utils/getGlobalStreamId "1" nil)
-                               (Thrift/prepareShuffleGrouping)}
-                              report-errors-bolt)}))
-            _       (submit-local-topology (:nimbus cluster)
-                                             "test-errors"
-                                             {TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS 10
-                                              TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL 4
-                                              TOPOLOGY-DEBUG true
-                                              }
-                                             (:topology tracked))
-            _ (advance-cluster-time cluster 11)
-            storm-id (StormCommon/getStormId state "test-errors")
-            errors-count (fn [] (count (.errors state storm-id "2")))]
-
-        (is (nil? (clojurify-error (.lastError state storm-id "2"))))
-
-        ;; so it launches the topology
-        (advance-cluster-time cluster 2)
-        (.feed feeder [6])
-        (tracked-wait tracked 1)
-        (is (= 4 (errors-count)))
-        (is (clojurify-error (.lastError state storm-id "2")))
-        
-        (advance-time-secs! 5)
-        (.feed feeder [2])
-        (tracked-wait tracked 1)
-        (is (= 4 (errors-count)))
-        (is (clojurify-error (.lastError state storm-id "2")))
-        
-        (advance-time-secs! 6)
-        (.feed feeder [2])
-        (tracked-wait tracked 1)
-        (is (= 6 (errors-count)))
-        (is (clojurify-error (.lastError state storm-id "2")))
-        
-        (advance-time-secs! 6)
-        (.feed feeder [3])
-        (tracked-wait tracked 1)
-        (is (= 8 (errors-count)))
-        (is  (clojurify-error (.lastError state storm-id "2")))))))
-
-
-(deftest test-acking-branching-complex
-  ;; test acking with branching in the topology
-  )
-
-
-(deftest test-fields-grouping
-  ;; 1. put a shitload of random tuples through it and test that counts are right
-  ;; 2. test that different spouts with different phints group the same way
-  )
-
-(deftest test-all-grouping
-  )
-
-(deftest test-direct-grouping
-  )

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/clj/org/apache/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj
index 3308653..c993d5b 100644
--- a/storm-core/test/clj/org/apache/storm/metrics_test.clj
+++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj
@@ -281,13 +281,13 @@
       
       (.feed feeder ["a"] 1)
       (advance-cluster-time cluster 6)
-      (assert-acked tracker 1)
       (assert-buckets! "myspout" "__fail-count/default" [] cluster)
       (assert-buckets! "myspout" "__ack-count/default" [1] cluster)
       (assert-buckets! "myspout" "__emit-count/default" [1] cluster)
       (assert-buckets! "myspout" "__transfer-count/default" [1] cluster)            
       (assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)     
       (assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
+      (assert-acked tracker 1)
 
       (.feed feeder ["b"] 2)      
       (advance-cluster-time cluster 5)
@@ -337,12 +337,12 @@
       (.feed feeder ["b"] 2)
       (.feed feeder ["c"] 3)
       (advance-cluster-time cluster 9)
-      (assert-acked tracker 1 3)
       (assert-buckets! "myspout" "__ack-count/default" [2] cluster)
       (assert-buckets! "myspout" "__emit-count/default" [3] cluster)
       (assert-buckets! "myspout" "__transfer-count/default" [3] cluster)
       (assert-buckets! "mybolt" "__ack-count/myspout:default" [2] cluster)
       (assert-buckets! "mybolt" "__execute-count/myspout:default" [3] cluster)
+      (assert-acked tracker 1 3)
       
       (is (not (.isFailed tracker 2)))
       (advance-cluster-time cluster 30)

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index a9548d5..ac61390 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1188,6 +1188,14 @@
         (assert-files-in-dir [])
         ))))
 
+(defn wait-for-status [nimbus name status]
+  (while-timeout 5000
+    (let [topo-summary (first (filter (fn [topo] (= name (.get_name topo))) (.get_topologies (.getClusterInfo nimbus))))
+          topo-status (if topo-summary (.get_status topo-summary) "NOT-RUNNING")]
+      (log-message "WAITING FOR "name" TO BE " status " CURRENT " topo-status)
+      (not= topo-status status))
+    (Thread/sleep 100)))
+
 (deftest test-leadership
   "Tests that leader actions can only be performed by master and non leader fails to perform the same actions."
   (with-inprocess-zookeeper zk-port
@@ -1218,7 +1226,7 @@
               (submit-local-topology nimbus "t1" {} topology)
               ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
               (.rebalance nimbus "t1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
-              (Thread/sleep 1000)
+              (wait-for-status nimbus "t1" "ACTIVE") 
               (.deactivate nimbus "t1")
               (.activate nimbus "t1")
               (.rebalance nimbus "t1" (RebalanceOptions.))

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
index d0dfe2d..af492c6 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
@@ -48,10 +48,10 @@
                                      ThriftConnectionType/DRPC_INVOCATIONS)]
     (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop handler-server) (.stop invoke-server))))
     (log-message "storm conf:" conf)
-    (log-message "Starting DRPC invocation server ...")
+    (log-message "Starting DRPC invocation server ... " invocations-port)
     (.start (Thread. #(.serve invoke-server)))
     (wait-for-condition #(.isServing invoke-server))
-    (log-message "Starting DRPC handler server ...")
+    (log-message "Starting DRPC handler server ... " client-port)
     (.start (Thread. #(.serve handler-server)))
     (wait-for-condition #(.isServing handler-server))
     [handler-server invoke-server]))
@@ -100,42 +100,42 @@
 
 (defmacro with-simple-drpc-test-scenario
   [[strict? alice-client bob-client charlie-client alice-invok charlie-invok] & body]
-  (let [client-port (Utils/getAvailablePort)
-        invocations-port (Utils/getAvailablePort (int (inc client-port)))
-        storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                          {DRPC-AUTHORIZER-ACL-STRICT strict?
+  `(let [client-port# (Utils/getAvailablePort)
+         invocations-port# (Utils/getAvailablePort (int (inc client-port#)))
+         storm-conf# (merge (clojurify-structure (ConfigUtils/readStormConfig))
+                          {DRPC-AUTHORIZER-ACL-STRICT ~strict?
                            DRPC-AUTHORIZER-ACL-FILENAME "drpc-simple-acl-test-scenario.yaml"
                            STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"})]
-    `(with-server [~storm-conf
+    (with-server [storm-conf#
                    "org.apache.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer"
                    "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
                    "test/clj/org/apache/storm/security/auth/drpc-auth-server.jaas"
-                   ~client-port ~invocations-port]
+                   client-port# invocations-port#]
        (let [~alice-client (DRPCClient.
-                           (merge ~storm-conf {"java.security.auth.login.config"
+                           (merge storm-conf# {"java.security.auth.login.config"
                                               "test/clj/org/apache/storm/security/auth/drpc-auth-alice.jaas"})
                            "localhost"
-                           ~client-port)
+                           client-port#)
              ~bob-client (DRPCClient.
-                         (merge ~storm-conf {"java.security.auth.login.config"
+                         (merge storm-conf# {"java.security.auth.login.config"
                                             "test/clj/org/apache/storm/security/auth/drpc-auth-bob.jaas"})
                          "localhost"
-                         ~client-port)
+                         client-port#)
              ~charlie-client (DRPCClient.
-                               (merge ~storm-conf {"java.security.auth.login.config"
+                               (merge storm-conf# {"java.security.auth.login.config"
                                                   "test/clj/org/apache/storm/security/auth/drpc-auth-charlie.jaas"})
                                "localhost"
-                               ~client-port)
+                               client-port#)
              ~alice-invok (DRPCInvocationsClient.
-                            (merge ~storm-conf {"java.security.auth.login.config"
+                            (merge storm-conf# {"java.security.auth.login.config"
                                                "test/clj/org/apache/storm/security/auth/drpc-auth-alice.jaas"})
                             "localhost"
-                            ~invocations-port)
+                            invocations-port#)
              ~charlie-invok (DRPCInvocationsClient.
-                             (merge ~storm-conf {"java.security.auth.login.config"
+                             (merge storm-conf# {"java.security.auth.login.config"
                                                 "test/clj/org/apache/storm/security/auth/drpc-auth-charlie.jaas"})
                              "localhost"
-                             ~invocations-port)]
+                             invocations-port#)]
          (try
            ~@body
            (finally

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
deleted file mode 100644
index cedbd8e..0000000
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ /dev/null
@@ -1,926 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.supervisor-test
-  (:use [clojure test])
-  (:require [conjure.core])
-  (:use [conjure core])
-  (:require [clojure.contrib [string :as contrib-str]])
-  (:require [clojure [string :as string] [set :as set]])
-  (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout]
-           [org.apache.storm.daemon.supervisor SupervisorUtils SyncProcessEvent SupervisorData]
-           [java.util ArrayList Arrays HashMap]
-           [org.apache.storm.testing.staticmocking MockedSupervisorUtils]
-           [org.apache.storm.daemon.supervisor.workermanager DefaultWorkerManager])
-  (:import [org.apache.storm.scheduler ISupervisor])
-  (:import [org.apache.storm.utils Time Utils$UptimeComputer ConfigUtils])
-  (:import [org.apache.storm.generated RebalanceOptions WorkerResources])
-  (:import [org.apache.storm.testing.staticmocking MockedCluster])
-  (:import [java.util UUID])
-  (:import [org.apache.storm Thrift])
-  (:import [org.mockito Mockito Matchers])
-  (:import [org.mockito.exceptions.base MockitoAssertionError])
-  (:import [java.io File])
-  (:import [java.nio.file Files])
-  (:import [org.apache.storm.utils Utils IPredicate])
-  (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils]
-           [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller])
-  (:import [java.nio.file.attribute FileAttribute] (org.apache.storm.topology TopologyBuilder))
-  (:import [org.apache.storm.daemon StormCommon])
-  (:use [org.apache.storm config testing util log converter])
-  (:use [org.apache.storm.daemon common])
-  (:require [org.apache.storm.daemon [worker :as worker] [local-supervisor :as local-supervisor]])
-  (:use [conjure core])
-  (:require [clojure.java.io :as io]))
-
-(defn worker-assignment
-  "Return [storm-id executors]"
-  [cluster supervisor-id port]
-  (let [state (:storm-cluster-state cluster)
-        slot-assigns (for [storm-id (.assignments state nil)]
-                        (let [executors (-> (clojurify-assignment (.assignmentInfo state storm-id nil))
-                                        :executor->node+port
-                                        (Utils/reverseMap)
-                                        clojurify-structure
-                                        (get [supervisor-id port] ))]
-                          (when executors [storm-id executors])
-                          ))
-        pred (reify IPredicate (test [this x] (not-nil? x)))
-        ret (Utils/findOne pred slot-assigns)]
-    (when-not ret
-      (throw (RuntimeException. "Could not find assignment for worker")))
-    ret
-    ))
-
-(defn heartbeat-worker [supervisor port storm-id executors]
-  (let [conf (.getConf supervisor)]
-    (worker/do-heartbeat {:conf conf
-                          :port port
-                          :storm-id storm-id
-                          :executors executors
-                          :worker-id (find-worker-id conf port)})))
-
-(defn heartbeat-workers [cluster supervisor-id ports]
-  (let [sup (get-supervisor cluster supervisor-id)]
-    (doseq [p ports]
-      (let [[storm-id executors] (worker-assignment cluster supervisor-id p)]
-        (heartbeat-worker sup p storm-id executors)
-        ))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn validate-launched-once [launched supervisor->ports storm-id]
-  (let [counts (map count (vals launched))
-        launched-supervisor->ports (apply merge-with set/union
-                                          (for [[[s p] sids] launched
-                                                :when (some #(= % storm-id) sids)]
-                                            {s #{p}}))
-        supervisor->ports (map-val set supervisor->ports)]
-    (is (every? (partial = 1) counts))
-    (is (= launched-supervisor->ports supervisor->ports))
-    ))
-
-(defmacro letlocals
-  [& body]
-  (let [[tobind lexpr] (split-at (dec (count body)) body)
-        binded (vec (mapcat (fn [e]
-                              (if (and (list? e) (= 'bind (first e)))
-                                [(second e) (last e)]
-                                ['_ e]
-                                ))
-                            tobind))]
-    `(let ~binded
-       ~(first lexpr))))
-
-(deftest launches-assignment
-  (with-simulated-time-local-cluster [cluster :supervisors 0
-    :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
-                  SUPERVISOR-WORKER-START-TIMEOUT-SECS 5
-                  SUPERVISOR-WORKER-TIMEOUT-SECS 15
-                  SUPERVISOR-MONITOR-FREQUENCY-SECS 3}]
-    (letlocals
-      (bind topology (Thrift/buildTopology
-                       {"1" (Thrift/prepareSpoutDetails
-                              (TestPlannerSpout. true) (Integer. 4))}
-                       {}))
-      (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
-      (bind changed (capture-changed-workers
-                      (submit-mocked-assignment
-                        (:nimbus cluster)
-                        (:storm-cluster-state cluster)
-                        "test"
-                        {TOPOLOGY-WORKERS 3}
-                        topology
-                        {1 "1"
-                         2 "1"
-                         3 "1"
-                         4 "1"}
-                        {[1 1] ["sup1" 1]
-                         [2 2] ["sup1" 2]
-                         [3 3] ["sup1" 3]
-                         [4 4] ["sup1" 3]}
-                        {["sup1" 1] [0.0 0.0 0.0]
-                         ["sup1" 2] [0.0 0.0 0.0]
-                         ["sup1" 3] [0.0 0.0 0.0]
-                         })
-                      ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
-                      (.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.) (.set_wait_secs 0)))
-                      (advance-cluster-time cluster 2)
-                      (heartbeat-workers cluster "sup1" [1 2 3])
-                      (advance-cluster-time cluster 10)))
-      (bind storm-id (StormCommon/getStormId (:storm-cluster-state cluster) "test"))
-      (is (empty? (:shutdown changed)))
-      (validate-launched-once (:launched changed) {"sup1" [1 2 3]} storm-id)
-      (bind changed (capture-changed-workers
-                        (doseq [i (range 10)]
-                          (heartbeat-workers cluster "sup1" [1 2 3])
-                          (advance-cluster-time cluster 10))
-                        ))
-      (is (empty? (:shutdown changed)))
-      (is (empty? (:launched changed)))
-      (bind changed (capture-changed-workers
-                      (heartbeat-workers cluster "sup1" [1 2])
-                      (advance-cluster-time cluster 10)
-                      ))
-      (validate-launched-once (:launched changed) {"sup1" [3]} storm-id)
-      (is (= {["sup1" 3] 1} (:shutdown changed)))
-      )))
-
-(deftest test-multiple-active-storms-multiple-supervisors
-  (with-simulated-time-local-cluster [cluster :supervisors 0
-    :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
-                  SUPERVISOR-WORKER-START-TIMEOUT-SECS 5
-                  SUPERVISOR-WORKER-TIMEOUT-SECS 15
-                  SUPERVISOR-MONITOR-FREQUENCY-SECS 3}]
-    (letlocals
-      (bind topology (Thrift/buildTopology
-                       {"1" (Thrift/prepareSpoutDetails
-                              (TestPlannerSpout. true) (Integer. 4))}
-                       {}))
-      (bind topology2 (Thrift/buildTopology
-                       {"1" (Thrift/prepareSpoutDetails
-                              (TestPlannerSpout. true) (Integer. 3))}
-                       {}))
-      (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
-      (bind sup2 (add-supervisor cluster :id "sup2" :ports [1 2]))
-      (bind changed (capture-changed-workers
-                      (submit-mocked-assignment
-                        (:nimbus cluster)
-                        (:storm-cluster-state cluster)
-                        "test"
-                        {TOPOLOGY-WORKERS 3 TOPOLOGY-MESSAGE-TIMEOUT-SECS 40}
-                        topology
-                        {1 "1"
-                         2 "1"
-                         3 "1"
-                         4 "1"}
-                        {[1 1] ["sup1" 1]
-                         [2 2] ["sup1" 2]
-                         [3 3] ["sup2" 1]
-                         [4 4] ["sup2" 1]}
-                        {["sup1" 1] [0.0 0.0 0.0]
-                         ["sup1" 2] [0.0 0.0 0.0]
-                         ["sup2" 1] [0.0 0.0 0.0]
-                         })
-                      ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
-                      (.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.) (.set_wait_secs 0)))
-                      (advance-cluster-time cluster 2)
-                      (heartbeat-workers cluster "sup1" [1 2])
-                      (heartbeat-workers cluster "sup2" [1])
-                      ))
-      (bind storm-id (StormCommon/getStormId (:storm-cluster-state cluster) "test"))
-      (is (empty? (:shutdown changed)))
-      (validate-launched-once (:launched changed) {"sup1" [1 2] "sup2" [1]} storm-id)
-      (bind changed (capture-changed-workers
-                      (submit-mocked-assignment
-                        (:nimbus cluster)
-                        (:storm-cluster-state cluster)
-                        "test2"
-                        {TOPOLOGY-WORKERS 2}
-                        topology2
-                        {1 "1"
-                         2 "1"
-                         3 "1"}
-                        {[1 1] ["sup1" 3]
-                         [2 2] ["sup1" 3]
-                         [3 3] ["sup2" 2]}
-                        {["sup1" 3] [0.0 0.0 0.0]
-                         ["sup2" 2] [0.0 0.0 0.0]
-                         })
-                      ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
-                      (.rebalance (:nimbus cluster) "test2" (doto (RebalanceOptions.) (.set_wait_secs 0)))
-                      (advance-cluster-time cluster 2)
-                      (heartbeat-workers cluster "sup1" [3])
-                      (heartbeat-workers cluster "sup2" [2])
-                      ))
-      (bind storm-id2 (StormCommon/getStormId (:storm-cluster-state cluster) "test2"))
-      (is (empty? (:shutdown changed)))
-      (validate-launched-once (:launched changed) {"sup1" [3] "sup2" [2]} storm-id2)
-      (bind changed (capture-changed-workers
-        (.killTopology (:nimbus cluster) "test")
-        (doseq [i (range 4)]
-          (advance-cluster-time cluster 8)
-          (heartbeat-workers cluster "sup1" [1 2 3])
-          (heartbeat-workers cluster "sup2" [1 2])
-          )))
-      (is (empty? (:shutdown changed)))
-      (is (empty? (:launched changed)))
-      (bind changed (capture-changed-workers
-        (advance-cluster-time cluster 12)
-        ))
-      (is (empty? (:launched changed)))
-      (is (= {["sup1" 1] 1 ["sup1" 2] 1 ["sup2" 1] 1} (:shutdown changed)))
-      (bind changed (capture-changed-workers
-        (doseq [i (range 10)]
-          (heartbeat-workers cluster "sup1" [3])
-          (heartbeat-workers cluster "sup2" [2])
-          (advance-cluster-time cluster 10)
-          )))
-      (is (empty? (:shutdown changed)))
-      (is (empty? (:launched changed)))
-      ;; TODO check that downloaded code is cleaned up only for the one storm
-      )))
-
-(defn get-heartbeat [cluster supervisor-id]
-  (clojurify-supervisor-info (.supervisorInfo (:storm-cluster-state cluster) supervisor-id)))
-
-(defn check-heartbeat [cluster supervisor-id within-secs]
-  (let [hb (get-heartbeat cluster supervisor-id)
-        time-secs (:time-secs hb)
-        now (Time/currentTimeSecs)
-        delta (- now time-secs)]
-    (is (>= delta 0))
-    (is (<= delta within-secs))
-    ))
-
-(deftest heartbeats-to-nimbus
-  (with-simulated-time-local-cluster [cluster :supervisors 0
-    :daemon-conf {SUPERVISOR-WORKER-START-TIMEOUT-SECS 15
-                  SUPERVISOR-HEARTBEAT-FREQUENCY-SECS 3}]
-    (letlocals
-      (bind sup1 (add-supervisor cluster :id "sup" :ports [5 6 7]))
-      (advance-cluster-time cluster 4)
-      (bind hb (get-heartbeat cluster "sup"))
-      (is (= #{5 6 7} (set (:meta hb))))
-      (check-heartbeat cluster "sup" 3)
-      (advance-cluster-time cluster 3)
-      (check-heartbeat cluster "sup" 3)
-      (advance-cluster-time cluster 3)
-      (check-heartbeat cluster "sup" 3)
-      (advance-cluster-time cluster 15)
-      (check-heartbeat cluster "sup" 3)
-      (bind topology (Thrift/buildTopology
-                       {"1" (Thrift/prepareSpoutDetails
-                              (TestPlannerSpout. true) (Integer. 4))}
-                       {}))
-      ;; prevent them from launching by capturing them
-      (capture-changed-workers
-       (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology)
-       (advance-cluster-time cluster 3)
-       (check-heartbeat cluster "sup" 3)
-       (advance-cluster-time cluster 3)
-       (check-heartbeat cluster "sup" 3)
-       (advance-cluster-time cluster 3)
-       (check-heartbeat cluster "sup" 3)
-       (advance-cluster-time cluster 20)
-       (check-heartbeat cluster "sup" 3))
-      )))
-
-(deftest test-worker-launch-command
-  (testing "*.worker.childopts configuration"
-    (let [mock-port 42
-          mock-storm-id "fake-storm-id"
-          mock-worker-id "fake-worker-id"
-          mock-cp (str Utils/FILE_PATH_SEPARATOR "base" Utils/CLASS_PATH_SEPARATOR Utils/FILE_PATH_SEPARATOR "stormjar.jar")
-          mock-sensitivity "S3"
-          builder (TopologyBuilder.)
-          \u2028_ (.setSpout builder "wordSpout" (TestWordSpout.) 1)
-          \u2028_ (.shuffleGrouping (.setBolt builder "wordCountBolt" (TestWordCounter.) 1) "wordSpout")
-          \u2028mock-storm-topology (.createTopology builder)
-          exp-args-fn (fn [opts topo-opts classpath]
-                        (let [file-prefix (let [os (System/getProperty "os.name")]
-                                            (if (.startsWith os "Windows") (str "file:///")
-                                                    (str "")))
-                              sequences (concat [(SupervisorUtils/javaCmd "java") "-cp" classpath
-                                                (str "-Dlogfile.name=" "worker.log")
-                                                "-Dstorm.home="
-                                                (str "-Dworkers.artifacts=" "/tmp/workers-artifacts")
-                                                (str "-Dstorm.id=" mock-storm-id)
-                                                (str "-Dworker.id=" mock-worker-id)
-                                                (str "-Dworker.port=" mock-port)
-                                                (str "-Dstorm.log.dir=" (ConfigUtils/getLogDir))
-                                                (str "-Dlog4j.configurationFile=" file-prefix Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml")
-                                                 "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
-                                                "org.apache.storm.LogWriter"]
-                                         [(SupervisorUtils/javaCmd "java") "-server"]
-                                         opts
-                                         topo-opts
-                                         ["-Djava.library.path="
-                                          (str "-Dlogfile.name=" "worker.log")
-                                          "-Dstorm.home="
-                                          "-Dworkers.artifacts=/tmp/workers-artifacts"
-                                          "-Dstorm.conf.file="
-                                          "-Dstorm.options="
-                                          (str "-Dstorm.log.dir=" (ConfigUtils/getLogDir))
-                                          (str "-Djava.io.tmpdir=/tmp/workers" Utils/FILE_PATH_SEPARATOR mock-worker-id Utils/FILE_PATH_SEPARATOR "tmp")
-                                          (str "-Dlogging.sensitivity=" mock-sensitivity)
-                                          (str "-Dlog4j.configurationFile=" file-prefix Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml")
-                                          "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
-                                          (str "-Dstorm.id=" mock-storm-id)
-                                          (str "-Dworker.id=" mock-worker-id)
-                                          (str "-Dworker.port=" mock-port)
-                                          "-cp" classpath
-                                          "org.apache.storm.daemon.worker"
-                                          mock-storm-id
-                                          ""
-                                          mock-port
-                                          mock-worker-id])
-                          ret (ArrayList.)]
-                        (doseq [val sequences]
-                          (.add ret (str val)))
-                          ret))]
-      (testing "testing *.worker.childopts as strings with extra spaces"
-        (let [string-opts "-Dfoo=bar  -Xmx1024m"
-              topo-string-opts "-Dkau=aux   -Xmx2048m"
-              exp-args (exp-args-fn ["-Dfoo=bar" "-Xmx1024m"]
-                                    ["-Dkau=aux" "-Xmx2048m"]
-                                    mock-cp)
-              mock-supervisor {STORM-CLUSTER-MODE :distributed
-                                      WORKER-CHILDOPTS string-opts}
-              mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
-                                            topo-string-opts}
-              utils-spy (->>
-                          (proxy [Utils] []
-                            (addToClasspathImpl [classpath paths] mock-cp)
-                            (launchProcessImpl [& _] nil))
-                          Mockito/spy)
-              cu-proxy (proxy [ConfigUtils] []
-                          (supervisorStormDistRootImpl ([conf] nil)
-                                                       ([conf storm-id] nil))
-                          (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
-                          (readSupervisorTopologyImpl [conf storm-id] mock-storm-topology)
-                          (setWorkerUserWSEImpl [conf worker-id user] nil)
-                          (workerRootImpl [conf] "/tmp/workers")
-                          (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
-              worker-manager (proxy [DefaultWorkerManager] []
-                               (jlp [stormRoot conf] ""))
-              process-proxy (proxy [SyncProcessEvent] []
-                              (writeLogMetadata [stormconf user workerId stormId port conf] nil)
-                              (createBlobstoreLinks [conf stormId workerId] nil))]
-
-          (with-open [_ (ConfigUtilsInstaller. cu-proxy)
-                      _ (UtilsInstaller. utils-spy)]
-                (.prepareWorker worker-manager mock-supervisor nil)
-                (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
-                                      "" mock-storm-id mock-port
-                                      mock-worker-id
-                                      (WorkerResources.) nil)
-                (. (Mockito/verify utils-spy)
-                   (launchProcessImpl (Matchers/eq exp-args)
-                                      (Matchers/any)
-                                      (Matchers/any)
-                                      (Matchers/any)
-                                      (Matchers/any))))))
-
-      (testing "testing *.worker.childopts as list of strings, with spaces in values"
-        (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
-              topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
-              exp-args (exp-args-fn list-opts topo-list-opts mock-cp)
-              mock-supervisor  {STORM-CLUSTER-MODE :distributed
-                                      WORKER-CHILDOPTS list-opts}
-              mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
-                                            topo-list-opts}
-              cu-proxy (proxy [ConfigUtils] []
-                          (supervisorStormDistRootImpl ([conf] nil)
-                                                       ([conf storm-id] nil))
-                          (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
-                          (readSupervisorTopologyImpl [conf storm-id] mock-storm-topology)
-                          (setWorkerUserWSEImpl [conf worker-id user] nil)
-                          (workerRootImpl [conf] "/tmp/workers")
-                          (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
-              utils-spy (->>
-                          (proxy [Utils] []
-                            (addToClasspathImpl [classpath paths] mock-cp)
-                            (launchProcessImpl [& _] nil))
-                          Mockito/spy)
-              worker-manager (proxy [DefaultWorkerManager] []
-                               (jlp [stormRoot conf] ""))
-              process-proxy (proxy [SyncProcessEvent] []
-                              (writeLogMetadata [stormconf user workerId stormId port conf] nil)
-                              (createBlobstoreLinks [conf stormId workerId] nil))]
-            (with-open [_ (ConfigUtilsInstaller. cu-proxy)
-                        _ (UtilsInstaller. utils-spy)]
-                  (.prepareWorker worker-manager mock-supervisor nil)
-                  (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
-                                            "" mock-storm-id
-                                            mock-port
-                                            mock-worker-id
-                                            (WorkerResources.) nil)
-                  (. (Mockito/verify utils-spy)
-                     (launchProcessImpl (Matchers/eq exp-args)
-                                        (Matchers/any)
-                                        (Matchers/any)
-                                        (Matchers/any)
-                                        (Matchers/any))))))
-
-      (testing "testing topology.classpath is added to classpath"
-        (let [topo-cp (str Utils/FILE_PATH_SEPARATOR "any" Utils/FILE_PATH_SEPARATOR "path")
-              exp-args (exp-args-fn [] [] (Utils/addToClasspath mock-cp [topo-cp]))
-              mock-supervisor {STORM-CLUSTER-MODE :distributed}
-              mocked-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
-              cu-proxy (proxy [ConfigUtils] []
-                          (supervisorStormDistRootImpl ([conf] nil)
-                                                       ([conf storm-id] nil))
-                          (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
-                          (readSupervisorTopologyImpl [conf storm-id] mock-storm-topology)
-                          (setWorkerUserWSEImpl [conf worker-id user] nil)
-                          (workerRootImpl [conf] "/tmp/workers")
-                          (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
-              utils-spy (->>
-                          (proxy [Utils] []
-                            (currentClasspathImpl []
-                              (str Utils/FILE_PATH_SEPARATOR "base"))
-                            (launchProcessImpl [& _] nil))
-                          Mockito/spy)
-              worker-manager (proxy [DefaultWorkerManager] []
-                               (jlp [stormRoot conf] ""))
-              process-proxy (proxy [SyncProcessEvent] []
-                              (writeLogMetadata [stormconf user workerId stormId port conf] nil)
-                              (createBlobstoreLinks [conf stormId workerId] nil))]
-          (with-open [_ (ConfigUtilsInstaller. cu-proxy)
-                      _ (UtilsInstaller. utils-spy)]
-                  (.prepareWorker worker-manager mock-supervisor nil)
-                  (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
-                                               "" mock-storm-id
-                                              mock-port
-                                              mock-worker-id
-                                              (WorkerResources.) nil)
-                  (. (Mockito/verify utils-spy)
-                     (launchProcessImpl (Matchers/eq exp-args)
-                                        (Matchers/any)
-                                        (Matchers/any)
-                                        (Matchers/any)
-                                        (Matchers/any))))))
-      (testing "testing topology.environment is added to environment for worker launch"
-        (let [topo-env {"THISVAR" "somevalue" "THATVAR" "someothervalue"}
-              full-env (merge topo-env {"LD_LIBRARY_PATH" nil})
-              exp-args (exp-args-fn [] [] mock-cp)
-              mock-supervisor {STORM-CLUSTER-MODE :distributed}
-              mocked-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}
-              cu-proxy (proxy [ConfigUtils] []
-                          (supervisorStormDistRootImpl ([conf] nil)
-                                                       ([conf storm-id] nil))
-                          (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
-                          (readSupervisorTopologyImpl [conf storm-id] mock-storm-topology)
-                          (setWorkerUserWSEImpl [conf worker-id user] nil)
-                          (workerRootImpl [conf] "/tmp/workers")
-                          (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
-              utils-spy (->>
-                          (proxy [Utils] []
-                            (currentClasspathImpl []
-                              (str Utils/FILE_PATH_SEPARATOR "base"))
-                            (launchProcessImpl [& _] nil))
-                          Mockito/spy)
-              worker-manager (proxy [DefaultWorkerManager] []
-                               (jlp [stormRoot conf] nil))
-              process-proxy (proxy [SyncProcessEvent] []
-                              (writeLogMetadata [stormconf user workerId stormId port conf] nil)
-                              (createBlobstoreLinks [conf stormId workerId] nil))]
-          (with-open [_ (ConfigUtilsInstaller. cu-proxy)
-                      _ (UtilsInstaller. utils-spy)]
-            (.prepareWorker worker-manager mock-supervisor nil)
-            (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
-                                        "" mock-storm-id
-                                        mock-port
-                                        mock-worker-id
-                                        (WorkerResources.) nil)
-              (. (Mockito/verify utils-spy)
-                 (launchProcessImpl (Matchers/any)
-                                    (Matchers/eq full-env)
-                                    (Matchers/any)
-                                    (Matchers/any)
-                                    (Matchers/any))))))
-
-      (testing "testing addToClasspath(Collection<String>, Collection<String>)"
-        (let [firstpart (java.util.ArrayList. ["a" "b" "c"])
-              secondpart (java.util.ArrayList. ["d" "e" "f"])
-              ^java.util.ArrayList nillist nil]
-          (is (= (Utils/addToClasspath firstpart secondpart)
-                 "a:b:c:d:e:f"))
-          (is (= (Utils/addToClasspath [] secondpart)
-                 "d:e:f"))
-          (is (= (Utils/addToClasspath nillist secondpart)
-                 "d:e:f"))
-          (is (= (Utils/addToClasspath firstpart [])
-                 "a:b:c"))
-          (is (= (Utils/addToClasspath firstpart nillist)
-                 "a:b:c"))
-          (is (= (Utils/addToClasspath [] [])
-                 ""))
-          (is (= (Utils/addToClasspath nillist nillist)
-                 "")))))))
-
-(deftest test-worker-launch-command-run-as-user
-  (testing "*.worker.childopts configuration"
-    (let [file-prefix (let [os (System/getProperty "os.name")]
-                        (if (.startsWith os "Windows") (str "file:///")
-                          (str "")))
-          mock-port 42
-          mock-storm-id "fake-storm-id"
-          mock-worker-id "fake-worker-id"
-          mock-sensitivity "S3"
-          mock-cp "mock-classpath'quote-on-purpose"
-          builder (TopologyBuilder.)
-          \u2028_ (.setSpout builder "wordSpout" (TestWordSpout.) 1)
-          \u2028_ (.shuffleGrouping (.setBolt builder "wordCountBolt" (TestWordCounter.) 1) "wordSpout")
-          mock-storm-topology (.createTopology builder)
-          attrs (make-array FileAttribute 0)
-          storm-local (.getCanonicalPath (.toFile (Files/createTempDirectory "storm-local" attrs)))
-          worker-script (str storm-local Utils/FILE_PATH_SEPARATOR "workers" Utils/FILE_PATH_SEPARATOR mock-worker-id Utils/FILE_PATH_SEPARATOR "storm-worker-script.sh")
-          exp-launch ["/bin/worker-launcher"
-                      "me"
-                      "worker"
-                      (str storm-local Utils/FILE_PATH_SEPARATOR "workers" Utils/FILE_PATH_SEPARATOR mock-worker-id)
-                      worker-script]
-          exp-script-fn (fn [opts topo-opts]
-                          (str "#!/bin/bash\n'export' 'LD_LIBRARY_PATH=';\n\nexec 'java'"
-                               " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
-                               " '-Dlogfile.name=" "worker.log'"
-                               " '-Dstorm.home='"
-                               " '-Dworkers.artifacts=" (str storm-local "/workers-artifacts'")
-                               " '-Dstorm.id=" mock-storm-id "'"
-                               " '-Dworker.id=" mock-worker-id "'"
-                               " '-Dworker.port=" mock-port "'"
-                               " '-Dstorm.log.dir=" (ConfigUtils/getLogDir) "'"
-                               " '-Dlog4j.configurationFile=" (str file-prefix Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml'")
-                               " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
-                               " 'org.apache.storm.LogWriter'"
-                               " 'java' '-server'"
-                               " " (Utils/shellCmd opts)
-                               " " (Utils/shellCmd topo-opts)
-                               " '-Djava.library.path='"
-                               " '-Dlogfile.name=" "worker.log'"
-                               " '-Dstorm.home='"
-                               " '-Dworkers.artifacts=" (str storm-local "/workers-artifacts'")
-                               " '-Dstorm.conf.file='"
-                               " '-Dstorm.options='"
-                               " '-Dstorm.log.dir=" (ConfigUtils/getLogDir) "'"
-                               " '-Djava.io.tmpdir=" (str  storm-local "/workers/" mock-worker-id "/tmp'")
-                               " '-Dlogging.sensitivity=" mock-sensitivity "'"
-                               " '-Dlog4j.configurationFile=" (str file-prefix Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml'")
-                               " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
-                               " '-Dstorm.id=" mock-storm-id "'"
-                               " '-Dworker.id=" mock-worker-id "'"
-                               " '-Dworker.port=" mock-port "'"
-                               " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
-                               " 'org.apache.storm.daemon.worker'"
-                               " '" mock-storm-id "'"
-                               " '""'"
-                               " '" mock-port "'"
-                               " '" mock-worker-id "';"))]
-      (try
-        (testing "testing *.worker.childopts as strings with extra spaces"
-          (let [string-opts "-Dfoo=bar  -Xmx1024m"
-                topo-string-opts "-Dkau=aux   -Xmx2048m"
-                exp-script (exp-script-fn ["-Dfoo=bar" "-Xmx1024m"]
-                                          ["-Dkau=aux" "-Xmx2048m"])
-                _ (.mkdirs (io/file storm-local "workers" mock-worker-id))
-                mock-supervisor {STORM-CLUSTER-MODE :distributed
-                                        STORM-LOCAL-DIR storm-local
-                                        STORM-WORKERS-ARTIFACTS-DIR (str storm-local "/workers-artifacts")
-                                        SUPERVISOR-RUN-WORKER-AS-USER true
-                                        WORKER-CHILDOPTS string-opts}
-                mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
-                                              topo-string-opts
-                                              TOPOLOGY-SUBMITTER-USER "me"}
-                cu-proxy (proxy [ConfigUtils] []
-                          (supervisorStormDistRootImpl ([conf] nil)
-                                                       ([conf storm-id] nil))
-                          (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
-                          (readSupervisorTopologyImpl [conf storm-id] mock-storm-topology)
-                          (setWorkerUserWSEImpl [conf worker-id user] nil))
-                utils-spy (->>
-                            (proxy [Utils] []
-                              (addToClasspathImpl [classpath paths] mock-cp)
-                              (launchProcessImpl [& _] nil))
-                            Mockito/spy)
-                supervisor-utils (Mockito/mock SupervisorUtils)
-                worker-manager (proxy [DefaultWorkerManager] []
-                                 (jlp [stormRoot conf] ""))
-                process-proxy (proxy [SyncProcessEvent] []
-                                (writeLogMetadata [stormconf user workerId stormId port conf] nil))]
-            (with-open [_ (ConfigUtilsInstaller. cu-proxy)
-                        _ (UtilsInstaller. utils-spy)
-                        _ (MockedSupervisorUtils. supervisor-utils)]
-              (. (Mockito/when (.javaCmdImpl supervisor-utils (Mockito/any))) (thenReturn (str "java")))
-              (.prepareWorker worker-manager mock-supervisor nil)
-              (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
-                                          "" mock-storm-id
-                                          mock-port
-                                          mock-worker-id
-                                          (WorkerResources.) nil)
-                (. (Mockito/verify utils-spy)
-                   (launchProcessImpl (Matchers/eq exp-launch)
-                                      (Matchers/any)
-                                      (Matchers/any)
-                                      (Matchers/any)
-                                      (Matchers/any))))
-            (is (= (slurp worker-script) exp-script))
-            ))
-        (finally (Utils/forceDelete storm-local)))
-      (.mkdirs (io/file storm-local "workers" mock-worker-id))
-      (try
-        (testing "testing *.worker.childopts as list of strings, with spaces in values"
-          (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
-                topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
-                exp-script (exp-script-fn list-opts topo-list-opts)
-                mock-supervisor {STORM-CLUSTER-MODE :distributed
-                                        STORM-LOCAL-DIR storm-local
-                                        STORM-WORKERS-ARTIFACTS-DIR (str storm-local "/workers-artifacts")
-                                        SUPERVISOR-RUN-WORKER-AS-USER true
-                                        WORKER-CHILDOPTS list-opts}
-                mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
-                                              topo-list-opts
-                                              TOPOLOGY-SUBMITTER-USER "me"}
-                cu-proxy (proxy [ConfigUtils] []
-                          (supervisorStormDistRootImpl ([conf] nil)
-                                                       ([conf storm-id] nil))
-                          (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
-                          (readSupervisorTopologyImpl [conf storm-id] mock-storm-topology)
-                          (setWorkerUserWSEImpl [conf worker-id user] nil))
-                utils-spy (->>
-                            (proxy [Utils] []
-                              (addToClasspathImpl [classpath paths] mock-cp)
-                              (launchProcessImpl [& _] nil))
-                            Mockito/spy)
-                supervisor-utils (Mockito/mock SupervisorUtils)
-                worker-manager (proxy [DefaultWorkerManager] []
-                                 (jlp [stormRoot conf] ""))
-                process-proxy (proxy [SyncProcessEvent] []
-                                (writeLogMetadata [stormconf user workerId stormId port conf] nil))]
-            (with-open [_ (ConfigUtilsInstaller. cu-proxy)
-                        _ (UtilsInstaller. utils-spy)
-                        _ (MockedSupervisorUtils. supervisor-utils)]
-              (. (Mockito/when (.javaCmdImpl supervisor-utils (Mockito/any))) (thenReturn (str "java")))
-              (.prepareWorker worker-manager mock-supervisor nil)
-              (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
-                                          "" mock-storm-id
-                                          mock-port
-                                          mock-worker-id
-                                          (WorkerResources.) nil)
-                (. (Mockito/verify utils-spy)
-                 (launchProcessImpl (Matchers/eq exp-launch)
-                                    (Matchers/any)
-                                    (Matchers/any)
-                                    (Matchers/any)
-                                    (Matchers/any))))
-            (is (= (slurp worker-script) exp-script))
-            ))
-        (finally (Utils/forceDelete storm-local))))))
-
-(deftest test-workers-go-bananas
-  ;; test that multiple workers are started for a port, and test that
-  ;; supervisor shuts down propertly (doesn't shutdown the most
-  ;; recently launched one, checks heartbeats correctly, etc.)
-  )
-
-(deftest downloads-code
-  )
-
-(deftest test-stateless
-  )
-
-(deftest cleans-up-on-unassign
-  ;; TODO just do reassign, and check that cleans up worker states after killing but doesn't get rid of downloaded code
-  )
-
-(deftest test-supervisor-data-acls
-  (testing "supervisor-data uses correct ACLs"
-    (let [scheme "digest"
-          digest "storm:thisisapoorpassword"
-          auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme
-                     STORM-ZOOKEEPER-AUTH-PAYLOAD digest
-                     STORM-SUPERVISOR-WORKER-MANAGER-PLUGIN "org.apache.storm.daemon.supervisor.workermanager.DefaultWorkerManager"}
-          expected-acls (SupervisorUtils/supervisorZkAcls)
-          fake-isupervisor (reify ISupervisor
-                             (getSupervisorId [this] nil)
-                             (getAssignmentId [this] nil))
-          fake-cu (proxy [ConfigUtils] []
-                    (supervisorStateImpl [conf] nil)
-                    (supervisorLocalDirImpl [conf] nil))
-          fake-utils (proxy [Utils] []
-                       (localHostnameImpl [] nil)
-                       (makeUptimeComputer [] (proxy [Utils$UptimeComputer] []
-                                                (upTime [] 0))))
-          cluster-utils (Mockito/mock ClusterUtils)]
-      (with-open [_ (ConfigUtilsInstaller. fake-cu)
-                  _ (UtilsInstaller. fake-utils)
-                  mocked-cluster (MockedCluster. cluster-utils)]
-          (SupervisorData. auth-conf nil fake-isupervisor)
-          (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))))))
-
-  (deftest test-write-log-metadata
-    (testing "supervisor writes correct data to logs metadata file"
-      (let [exp-owner "alice"
-            exp-worker-id "42"
-            exp-storm-id "0123456789"
-            exp-port 4242
-            exp-logs-users ["bob" "charlie" "daryl"]
-            exp-logs-groups ["read-only-group" "special-group"]
-            storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
-                        TOPOLOGY-USERS ["charlie" "bob"]
-                        TOPOLOGY-GROUPS ["special-group"]
-                        LOGS-GROUPS ["read-only-group"]
-                        LOGS-USERS ["daryl"]}
-            exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
-                      "worker-id" exp-worker-id
-                      LOGS-USERS exp-logs-users
-                      LOGS-GROUPS exp-logs-groups}
-            conf {}
-            process-proxy (->> (proxy [SyncProcessEvent] []
-                            (writeLogMetadataToYamlFile [stormId  port data conf] nil))
-                            Mockito/spy)]
-          (.writeLogMetadata process-proxy storm-conf exp-owner exp-worker-id
-            exp-storm-id  exp-port conf)
-        (.writeLogMetadataToYamlFile (Mockito/verify process-proxy (Mockito/times 1)) (Mockito/eq exp-storm-id) (Mockito/eq exp-port) (Mockito/any) (Mockito/eq conf)))))
-
-  (deftest test-worker-launcher-requires-user
-    (testing "worker-launcher throws on blank user"
-      (let [utils-proxy (proxy [Utils] []
-                          (launchProcessImpl [& _] nil))]
-        (with-open [_ (UtilsInstaller. utils-proxy)]
-          (is (try
-                (SupervisorUtils/processLauncher {} nil nil (ArrayList.) {} nil nil nil)
-                false
-                (catch Throwable t
-                  (and (re-matches #"(?i).*user cannot be blank.*" (.getMessage t))
-                       (Utils/exceptionCauseIsInstanceOf java.lang.IllegalArgumentException t)))))))))
-
-  (defn found? [sub-str input-str]
-    (if (string? input-str)
-      (contrib-str/substring? sub-str (str input-str))
-      (boolean (some #(contrib-str/substring? sub-str %) input-str))))
-
-  (defn not-found? [sub-str input-str]
-    (complement (found? sub-str input-str)))
-
-  (deftest test-substitute-childopts-happy-path-string
-    (testing "worker-launcher replaces ids in childopts"
-      (let [worker-id "w-01"
-            topology-id "s-01"
-            port 9999
-            mem-onheap (int 512)
-            childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
-            expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-            worker-manager (DefaultWorkerManager.)
-            childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
-        (is (= expected-childopts childopts-with-ids)))))
-
-  (deftest test-substitute-childopts-happy-path-list
-    (testing "worker-launcher replaces ids in childopts"
-      (let [worker-id "w-01"
-            topology-id "s-01"
-            port 9999
-            mem-onheap (int 512)
-            childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
-            expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-            worker-manager (DefaultWorkerManager.)
-            childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
-        (is (= expected-childopts childopts-with-ids)))))
-
-  (deftest test-substitute-childopts-happy-path-list-arraylist
-    (testing "worker-launcher replaces ids in childopts"
-      (let [worker-id "w-01"
-            topology-id "s-01"
-            port 9999
-            mem-onheap (int 512)
-            childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
-            expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-            worker-manager (DefaultWorkerManager.)
-            childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
-        (is (= expected-childopts childopts-with-ids)))))
-
-  (deftest test-substitute-childopts-topology-id-alone
-    (testing "worker-launcher replaces ids in childopts"
-      (let [worker-id "w-01"
-            topology-id "s-01"
-            port 9999
-            mem-onheap (int 512)
-            childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
-            expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
-            worker-manager (DefaultWorkerManager.)
-            childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
-        (is (= expected-childopts childopts-with-ids)))))
-
-  (deftest test-substitute-childopts-no-keys
-    (testing "worker-launcher has no ids to replace in childopts"
-      (let [worker-id "w-01"
-            topology-id "s-01"
-            port 9999
-            mem-onheap (int 512)
-            childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
-            expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
-            worker-manager (DefaultWorkerManager.)
-            childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
-        (is (= expected-childopts childopts-with-ids)))))
-
-  (deftest test-substitute-childopts-nil-childopts
-    (testing "worker-launcher has nil childopts"
-      (let [worker-id "w-01"
-            topology-id "s-01"
-            port 9999
-            mem-onheap (int 512)
-            childopts nil
-            expected-childopts '[]
-            worker-manager (DefaultWorkerManager.)
-            childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
-        (is (= expected-childopts childopts-with-ids)))))
-
-  (deftest test-substitute-childopts-nil-ids
-    (testing "worker-launcher has nil ids"
-      (let [worker-id ""
-            topology-id "s-01"
-            port 9999
-            mem-onheap (int 512)
-            childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
-            expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
-            worker-manager (DefaultWorkerManager.)
-            childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
-        (is (= expected-childopts childopts-with-ids)))))
-
-  (deftest test-retry-read-assignments
-    (with-simulated-time-local-cluster [cluster
-                                        :supervisors 0
-                                        :ports-per-supervisor 2
-                                        :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
-                                                      NIMBUS-MONITOR-FREQ-SECS 10
-                                                      TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
-                                                      TOPOLOGY-ACKER-EXECUTORS 0}]
-      (letlocals
-        (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
-        (bind topology1 (Thrift/buildTopology
-                          {"1" (Thrift/prepareSpoutDetails
-                                 (TestPlannerSpout. true) (Integer. 2))}
-                          {}))
-        (bind topology2 (Thrift/buildTopology
-                          {"1" (Thrift/prepareSpoutDetails
-                                 (TestPlannerSpout. true) (Integer. 2))}
-                          {}))
-        (bind state (:storm-cluster-state cluster))
-        (bind changed (capture-changed-workers
-                        (submit-mocked-assignment
-                          (:nimbus cluster)
-                          (:storm-cluster-state cluster)
-                          "topology1"
-                          {TOPOLOGY-WORKERS 2}
-                          topology1
-                          {1 "1"
-                           2 "1"}
-                          {[1 1] ["sup1" 1]
-                           [2 2] ["sup1" 2]}
-                          {["sup1" 1] [0.0 0.0 0.0]
-                           ["sup1" 2] [0.0 0.0 0.0]
-                           })
-                        (submit-mocked-assignment
-                          (:nimbus cluster)
-                          (:storm-cluster-state cluster)
-                          "topology2"
-                          {TOPOLOGY-WORKERS 2}
-                          topology2
-                          {1 "1"
-                           2 "1"}
-                          {[1 1] ["sup1" 1]
-                           [2 2] ["sup1" 2]}
-                          {["sup1" 1] [0.0 0.0 0.0]
-                           ["sup1" 2] [0.0 0.0 0.0]
-                           })
-                        ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
-                        (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
-                        ))
-        (is (empty? (:launched changed)))
-        (bind options (RebalanceOptions.))
-        (.set_wait_secs options 0)
-        (bind changed (capture-changed-workers
-                        (.rebalance (:nimbus cluster) "topology2" options)
-                        (advance-cluster-time cluster 10)
-                        (heartbeat-workers cluster "sup1" [1 2 3 4])
-                        (advance-cluster-time cluster 10)
-                        ))
-        (validate-launched-once (:launched changed)
-          {"sup1" [1 2]}
-          (StormCommon/getStormId (:storm-cluster-state cluster) "topology1"))
-        (validate-launched-once (:launched changed)
-          {"sup1" [3 4]}
-          (StormCommon/getStormId (:storm-cluster-state cluster) "topology2"))
-        )))

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/jvm/org/apache/storm/TestCgroups.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/TestCgroups.java b/storm-core/test/jvm/org/apache/storm/TestCgroups.java
index 0857ba9..d732c36 100644
--- a/storm-core/test/jvm/org/apache/storm/TestCgroups.java
+++ b/storm-core/test/jvm/org/apache/storm/TestCgroups.java
@@ -31,7 +31,6 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,7 +60,7 @@ public class TestCgroups {
         CgroupManager manager = new CgroupManager();
         manager.prepare(config);
 
-        Map<String, Object> resourcesMap = new HashMap<String, Object>();
+        Map<String, Number> resourcesMap = new HashMap<>();
         resourcesMap.put("cpu", 200);
         resourcesMap.put("memory", 1024);
         String workerId = UUID.randomUUID().toString();