You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/09/19 21:01:31 UTC
[2/8] storm git commit: STORM-2018: Supervisor V2
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index d85b06b..a80d3d2 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -85,7 +85,6 @@ import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -153,7 +152,7 @@ public class Utils {
return oldInstance;
}
- private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+ public static final Logger LOG = LoggerFactory.getLogger(Utils.class);
public static final String DEFAULT_STREAM_ID = "default";
public static final String DEFAULT_BLOB_VERSION_SUFFIX = ".version";
public static final String CURRENT_BLOB_SUFFIX_ID = "current";
@@ -172,10 +171,11 @@ public class Utils {
public static final int SIGTERM = 15;
static {
- Map conf = readStormConfig();
+ Map<String, Object> conf = readStormConfig();
serializationDelegate = getSerializationDelegate(conf);
}
+ @SuppressWarnings("unchecked")
public static <T> T newInstance(String klass) {
try {
return newInstance((Class<T>)Class.forName(klass));
@@ -213,9 +213,9 @@ public class Utils {
return serializationDelegate.deserialize(serialized, clazz);
}
- public static <T> T thriftDeserialize(Class c, byte[] b, int offset, int length) {
+ public static <T> T thriftDeserialize(Class<T> c, byte[] b, int offset, int length) {
try {
- T ret = (T) c.newInstance();
+ T ret = c.newInstance();
TDeserializer des = getDes();
des.deserialize((TBase) ret, b, offset, length);
return ret;
@@ -353,16 +353,17 @@ public class Utils {
}
}
- public static Map findAndReadConfigFile(String name, boolean mustExist) {
+ public static Map<String, Object> findAndReadConfigFile(String name, boolean mustExist) {
InputStream in = null;
boolean confFileEmpty = false;
try {
in = getConfigFileInputStream(name);
if (null != in) {
Yaml yaml = new Yaml(new SafeConstructor());
- Map ret = (Map) yaml.load(new InputStreamReader(in));
+ @SuppressWarnings("unchecked")
+ Map<String, Object> ret = (Map<String, Object>) yaml.load(new InputStreamReader(in));
if (null != ret) {
- return new HashMap(ret);
+ return new HashMap<>(ret);
} else {
confFileEmpty = true;
}
@@ -374,7 +375,7 @@ public class Utils {
else
throw new RuntimeException("Could not find config file on classpath " + name);
} else {
- return new HashMap();
+ return new HashMap<>();
}
} catch (IOException e) {
throw new RuntimeException(e);
@@ -416,16 +417,16 @@ public class Utils {
}
- public static Map findAndReadConfigFile(String name) {
+ public static Map<String, Object> findAndReadConfigFile(String name) {
return findAndReadConfigFile(name, true);
}
- public static Map readDefaultConfig() {
+ public static Map<String, Object> readDefaultConfig() {
return findAndReadConfigFile("defaults.yaml", true);
}
- public static Map readCommandLineOpts() {
- Map ret = new HashMap();
+ public static Map<String, Object> readCommandLineOpts() {
+ Map<String, Object> ret = new HashMap<>();
String commandOptions = System.getProperty("storm.options");
if (commandOptions != null) {
/*
@@ -456,10 +457,10 @@ public class Utils {
return ret;
}
- public static Map readStormConfig() {
- Map ret = readDefaultConfig();
+ public static Map<String, Object> readStormConfig() {
+ Map<String, Object> ret = readDefaultConfig();
String confFile = System.getProperty("storm.conf.file");
- Map storm;
+ Map<String, Object> storm;
if (confFile == null || confFile.equals("")) {
storm = findAndReadConfigFile("storm.yaml", false);
} else {
@@ -568,6 +569,11 @@ public class Utils {
*/
public static void downloadResourcesAsSupervisor(String key, String localFile,
ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException, IOException {
+ _instance.downloadResourcesAsSupervisorImpl(key, localFile, cb);
+ }
+
+ public void downloadResourcesAsSupervisorImpl(String key, String localFile,
+ ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException, IOException {
final int MAX_RETRY_ATTEMPTS = 2;
final int ATTEMPTS_INTERVAL_TIME = 100;
for (int retryAttempts = 0; retryAttempts < MAX_RETRY_ATTEMPTS; retryAttempts++) {
@@ -586,11 +592,8 @@ public class Utils {
private static boolean downloadResourcesAsSupervisorAttempt(ClientBlobStore cb, String key, String localFile) {
boolean isSuccess = false;
- FileOutputStream out = null;
- InputStreamWithMeta in = null;
- try {
- out = new FileOutputStream(localFile);
- in = cb.getBlob(key);
+ try (FileOutputStream out = new FileOutputStream(localFile);
+ InputStreamWithMeta in = cb.getBlob(key);) {
long fileSize = in.getFileLength();
byte[] buffer = new byte[1024];
@@ -604,17 +607,6 @@ public class Utils {
isSuccess = (fileSize == downloadFileSize);
} catch (TException | IOException e) {
LOG.error("An exception happened while downloading {} from blob store.", localFile, e);
- } finally {
- try {
- if (out != null) {
- out.close();
- }
- } catch (IOException ignored) {}
- try {
- if (in != null) {
- in.close();
- }
- } catch (IOException ignored) {}
}
if (!isSuccess) {
try {
@@ -626,6 +618,10 @@ public class Utils {
return isSuccess;
}
+ public static boolean checkFileExists(File path) {
+ return Files.exists(path.toPath());
+ }
+
public static boolean checkFileExists(String path) {
return Files.exists(new File(path).toPath());
}
@@ -781,7 +777,7 @@ public class Utils {
}
}
- public static <T> T thriftDeserialize(Class c, byte[] b) {
+ public static <T> T thriftDeserialize(Class<T> c, byte[] b) {
try {
return Utils.thriftDeserialize(c, b, 0, b.length);
} catch (Exception e) {
@@ -1872,9 +1868,12 @@ public class Utils {
* @param jarpath Path to the jar file
* @param dir Directory in the jar to pull out
* @param destdir Path to the directory where the extracted directory will be put
- *
*/
- public static void extractDirFromJar(String jarpath, String dir, String destdir) {
+ public static void extractDirFromJar(String jarpath, String dir, File destdir) {
+ _instance.extractDirFromJarImpl(jarpath, dir, destdir);
+ }
+
+ public void extractDirFromJarImpl(String jarpath, String dir, File destdir) {
try (JarFile jarFile = new JarFile(jarpath)) {
Enumeration<JarEntry> jarEnums = jarFile.entries();
while (jarEnums.hasMoreElements()) {
@@ -1990,34 +1989,6 @@ public class Utils {
}
/**
- * Creates a symbolic link to the target
- * @param dir the parent directory of the link
- * @param targetDir the parent directory of the link's target
- * @param filename the file name of the link
- * @param targetFilename the file name of the links target
- * @throws IOException
- */
- public static void createSymlink(String dir, String targetDir,
- String filename, String targetFilename) throws IOException {
- Path path = Paths.get(dir, filename).toAbsolutePath();
- Path target = Paths.get(targetDir, targetFilename).toAbsolutePath();
- LOG.debug("Creating symlink [{}] to [{}]", path, target);
- if (!path.toFile().exists()) {
- Files.createSymbolicLink(path, target);
- }
- }
-
- /**
- * Convenience method for the case when the link's file name should be the
- * same as the file name of the target
- */
- public static void createSymlink(String dir, String targetDir,
- String targetFilename) throws IOException {
- Utils.createSymlink(dir, targetDir, targetFilename,
- targetFilename);
- }
-
- /**
* Returns a Collection of file names found under the given directory.
* @param dir a directory
* @return the Collection of file names
@@ -2047,56 +2018,6 @@ public class Utils {
return System.getProperty("java.class.path");
}
- /**
- * Returns a collection of jar file names found under the given directory.
- * @param dir the directory to search
- * @return the jar file names
- */
- private static List<String> getFullJars(String dir) {
- File[] files = new File(dir).listFiles(jarFilter);
-
- if(files == null) {
- return new ArrayList<>();
- }
-
- List<String> ret = new ArrayList<>(files.length);
- for (File f : files) {
- ret.add(Paths.get(dir, f.getName()).toString());
- }
- return ret;
- }
- private static final FilenameFilter jarFilter = new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.endsWith(".jar");
- }
- };
-
-
- public static String workerClasspath() {
- String stormDir = System.getProperty("storm.home");
-
- if (stormDir == null) {
- return Utils.currentClasspath();
- }
-
- String stormLibDir = Paths.get(stormDir, "lib").toString();
- String stormConfDir =
- System.getenv("STORM_CONF_DIR") != null ?
- System.getenv("STORM_CONF_DIR") :
- Paths.get(stormDir, "conf").toString();
- String stormExtlibDir = Paths.get(stormDir, "extlib").toString();
- String extcp = System.getenv("STORM_EXT_CLASSPATH");
- List<String> pathElements = new LinkedList<>();
- pathElements.addAll(Utils.getFullJars(stormLibDir));
- pathElements.addAll(Utils.getFullJars(stormExtlibDir));
- pathElements.add(extcp);
- pathElements.add(stormConfDir);
-
- return StringUtils.join(pathElements,
- CLASS_PATH_SEPARATOR);
- }
-
public static String addToClasspath(String classpath,
Collection<String> paths) {
return _instance.addToClasspathImpl(classpath, paths);
@@ -2281,78 +2202,6 @@ public class Utils {
null);
}
- /**
- * A callback that can accept an integer.
- * @param <V> the result type of method <code>call</code>
- */
- public interface ExitCodeCallable<V> extends Callable<V> {
- V call(int exitCode);
- }
-
- /**
- * Launch a new process as per {@link java.lang.ProcessBuilder} with a given
- * callback.
- * @param command the command to be executed in the new process
- * @param environment the environment to be applied to the process. Can be
- * null.
- * @param logPrefix a prefix for log entries from the output of the process.
- * Can be null.
- * @param exitCodeCallback code to be called passing the exit code value
- * when the process completes
- * @param dir the working directory of the new process
- * @return the new process
- * @throws IOException
- * @see java.lang.ProcessBuilder
- */
- public static Process launchProcess(List<String> command,
- Map<String,String> environment,
- final String logPrefix,
- final ExitCodeCallable exitCodeCallback,
- File dir)
- throws IOException {
- return _instance.launchProcessImpl(command, environment, logPrefix,
- exitCodeCallback, dir);
- }
-
- public Process launchProcessImpl(
- List<String> command,
- Map<String,String> cmdEnv,
- final String logPrefix,
- final ExitCodeCallable exitCodeCallback,
- File dir)
- throws IOException {
- ProcessBuilder builder = new ProcessBuilder(command);
- Map<String,String> procEnv = builder.environment();
- if (dir != null) {
- builder.directory(dir);
- }
- builder.redirectErrorStream(true);
- if (cmdEnv != null) {
- procEnv.putAll(cmdEnv);
- }
- final Process process = builder.start();
- if (logPrefix != null || exitCodeCallback != null) {
- Utils.asyncLoop(new Callable() {
- public Object call() {
- if (logPrefix != null ) {
- Utils.readAndLogStream(logPrefix,
- process.getInputStream());
- }
- if (exitCodeCallback != null) {
- try {
- process.waitFor();
- } catch (InterruptedException ie) {
- LOG.info("{} interrupted", logPrefix);
- exitCodeCallback.call(process.exitValue());
- }
- }
- return null; // Run only once.
- }
- });
- }
- return process;
- }
-
public static <T> List<T> interleaveAll(List<List<T>> nodeList) {
if (nodeList != null && nodeList.size() > 0) {
List<T> first = new ArrayList<T>();
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index 7723922..0580f41 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -36,6 +36,7 @@ import org.apache.storm.callback.DefaultWatcherCallBack;
import org.apache.storm.callback.WatcherCallBack;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.VersionedData;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.utils.ConfigUtils;
@@ -388,8 +389,15 @@ public class Zookeeper {
leaderLatchListenerAtomicReference, blobStore);
}
- public static Map getDataWithVersion(CuratorFramework zk, String path, boolean watch) {
- Map map = new HashMap();
+ /**
+ * Get the data along with a version
+ * @param zk the zk instance to use
+ * @param path the path to get it from
+ * @param watch should a watch be enabled
+ * @return null if no data is found, else the data with the version.
+ */
+ public static VersionedData<byte[]> getDataWithVersion(CuratorFramework zk, String path, boolean watch) {
+ VersionedData<byte[]> data = null;
try {
byte[] bytes = null;
Stat stats = new Stat();
@@ -402,8 +410,7 @@ public class Zookeeper {
}
if (bytes != null) {
int version = stats.getVersion();
- map.put(IStateStorage.DATA, bytes);
- map.put(IStateStorage.VERSION, version);
+ data = new VersionedData<>(version, bytes);
}
}
} catch (Exception e) {
@@ -413,7 +420,7 @@ public class Zookeeper {
Utils.wrapInRuntime(e);
}
}
- return map;
+ return data;
}
public static List<String> tokenizePath(String path) {
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index 787fbcd..90c2697 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -609,70 +609,3 @@
(report-error! collector (RuntimeException.)))
(ack! collector tuple))
-(deftest test-throttled-errors
- (with-simulated-time
- (with-tracked-cluster [cluster]
- (let [state (:storm-cluster-state cluster)
- [feeder checker] (ack-tracking-feeder ["num"])
- tracked (mk-tracked-topology
- cluster
- (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails feeder)}
- {"2" (Thrift/prepareBoltDetails
- {(Utils/getGlobalStreamId "1" nil)
- (Thrift/prepareShuffleGrouping)}
- report-errors-bolt)}))
- _ (submit-local-topology (:nimbus cluster)
- "test-errors"
- {TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS 10
- TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL 4
- TOPOLOGY-DEBUG true
- }
- (:topology tracked))
- _ (advance-cluster-time cluster 11)
- storm-id (StormCommon/getStormId state "test-errors")
- errors-count (fn [] (count (.errors state storm-id "2")))]
-
- (is (nil? (clojurify-error (.lastError state storm-id "2"))))
-
- ;; so it launches the topology
- (advance-cluster-time cluster 2)
- (.feed feeder [6])
- (tracked-wait tracked 1)
- (is (= 4 (errors-count)))
- (is (clojurify-error (.lastError state storm-id "2")))
-
- (advance-time-secs! 5)
- (.feed feeder [2])
- (tracked-wait tracked 1)
- (is (= 4 (errors-count)))
- (is (clojurify-error (.lastError state storm-id "2")))
-
- (advance-time-secs! 6)
- (.feed feeder [2])
- (tracked-wait tracked 1)
- (is (= 6 (errors-count)))
- (is (clojurify-error (.lastError state storm-id "2")))
-
- (advance-time-secs! 6)
- (.feed feeder [3])
- (tracked-wait tracked 1)
- (is (= 8 (errors-count)))
- (is (clojurify-error (.lastError state storm-id "2")))))))
-
-
-(deftest test-acking-branching-complex
- ;; test acking with branching in the topology
- )
-
-
-(deftest test-fields-grouping
- ;; 1. put a shitload of random tuples through it and test that counts are right
- ;; 2. test that different spouts with different phints group the same way
- )
-
-(deftest test-all-grouping
- )
-
-(deftest test-direct-grouping
- )
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/clj/org/apache/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj
index 3308653..c993d5b 100644
--- a/storm-core/test/clj/org/apache/storm/metrics_test.clj
+++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj
@@ -281,13 +281,13 @@
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 6)
- (assert-acked tracker 1)
(assert-buckets! "myspout" "__fail-count/default" [] cluster)
(assert-buckets! "myspout" "__ack-count/default" [1] cluster)
(assert-buckets! "myspout" "__emit-count/default" [1] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [1] cluster)
(assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)
(assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
+ (assert-acked tracker 1)
(.feed feeder ["b"] 2)
(advance-cluster-time cluster 5)
@@ -337,12 +337,12 @@
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
(advance-cluster-time cluster 9)
- (assert-acked tracker 1 3)
(assert-buckets! "myspout" "__ack-count/default" [2] cluster)
(assert-buckets! "myspout" "__emit-count/default" [3] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [3] cluster)
(assert-buckets! "mybolt" "__ack-count/myspout:default" [2] cluster)
(assert-buckets! "mybolt" "__execute-count/myspout:default" [3] cluster)
+ (assert-acked tracker 1 3)
(is (not (.isFailed tracker 2)))
(advance-cluster-time cluster 30)
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index a9548d5..ac61390 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1188,6 +1188,14 @@
(assert-files-in-dir [])
))))
+(defn wait-for-status [nimbus name status]
+ (while-timeout 5000
+ (let [topo-summary (first (filter (fn [topo] (= name (.get_name topo))) (.get_topologies (.getClusterInfo nimbus))))
+ topo-status (if topo-summary (.get_status topo-summary) "NOT-RUNNING")]
+ (log-message "WAITING FOR "name" TO BE " status " CURRENT " topo-status)
+ (not= topo-status status))
+ (Thread/sleep 100)))
+
(deftest test-leadership
"Tests that leader actions can only be performed by master and non leader fails to perform the same actions."
(with-inprocess-zookeeper zk-port
@@ -1218,7 +1226,7 @@
(submit-local-topology nimbus "t1" {} topology)
;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
(.rebalance nimbus "t1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
- (Thread/sleep 1000)
+ (wait-for-status nimbus "t1" "ACTIVE")
(.deactivate nimbus "t1")
(.activate nimbus "t1")
(.rebalance nimbus "t1" (RebalanceOptions.))
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
index d0dfe2d..af492c6 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
@@ -48,10 +48,10 @@
ThriftConnectionType/DRPC_INVOCATIONS)]
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop handler-server) (.stop invoke-server))))
(log-message "storm conf:" conf)
- (log-message "Starting DRPC invocation server ...")
+ (log-message "Starting DRPC invocation server ... " invocations-port)
(.start (Thread. #(.serve invoke-server)))
(wait-for-condition #(.isServing invoke-server))
- (log-message "Starting DRPC handler server ...")
+ (log-message "Starting DRPC handler server ... " client-port)
(.start (Thread. #(.serve handler-server)))
(wait-for-condition #(.isServing handler-server))
[handler-server invoke-server]))
@@ -100,42 +100,42 @@
(defmacro with-simple-drpc-test-scenario
[[strict? alice-client bob-client charlie-client alice-invok charlie-invok] & body]
- (let [client-port (Utils/getAvailablePort)
- invocations-port (Utils/getAvailablePort (int (inc client-port)))
- storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {DRPC-AUTHORIZER-ACL-STRICT strict?
+ `(let [client-port# (Utils/getAvailablePort)
+ invocations-port# (Utils/getAvailablePort (int (inc client-port#)))
+ storm-conf# (merge (clojurify-structure (ConfigUtils/readStormConfig))
+ {DRPC-AUTHORIZER-ACL-STRICT ~strict?
DRPC-AUTHORIZER-ACL-FILENAME "drpc-simple-acl-test-scenario.yaml"
STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"})]
- `(with-server [~storm-conf
+ (with-server [storm-conf#
"org.apache.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer"
"org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
"test/clj/org/apache/storm/security/auth/drpc-auth-server.jaas"
- ~client-port ~invocations-port]
+ client-port# invocations-port#]
(let [~alice-client (DRPCClient.
- (merge ~storm-conf {"java.security.auth.login.config"
+ (merge storm-conf# {"java.security.auth.login.config"
"test/clj/org/apache/storm/security/auth/drpc-auth-alice.jaas"})
"localhost"
- ~client-port)
+ client-port#)
~bob-client (DRPCClient.
- (merge ~storm-conf {"java.security.auth.login.config"
+ (merge storm-conf# {"java.security.auth.login.config"
"test/clj/org/apache/storm/security/auth/drpc-auth-bob.jaas"})
"localhost"
- ~client-port)
+ client-port#)
~charlie-client (DRPCClient.
- (merge ~storm-conf {"java.security.auth.login.config"
+ (merge storm-conf# {"java.security.auth.login.config"
"test/clj/org/apache/storm/security/auth/drpc-auth-charlie.jaas"})
"localhost"
- ~client-port)
+ client-port#)
~alice-invok (DRPCInvocationsClient.
- (merge ~storm-conf {"java.security.auth.login.config"
+ (merge storm-conf# {"java.security.auth.login.config"
"test/clj/org/apache/storm/security/auth/drpc-auth-alice.jaas"})
"localhost"
- ~invocations-port)
+ invocations-port#)
~charlie-invok (DRPCInvocationsClient.
- (merge ~storm-conf {"java.security.auth.login.config"
+ (merge storm-conf# {"java.security.auth.login.config"
"test/clj/org/apache/storm/security/auth/drpc-auth-charlie.jaas"})
"localhost"
- ~invocations-port)]
+ invocations-port#)]
(try
~@body
(finally
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
deleted file mode 100644
index cedbd8e..0000000
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ /dev/null
@@ -1,926 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.supervisor-test
- (:use [clojure test])
- (:require [conjure.core])
- (:use [conjure core])
- (:require [clojure.contrib [string :as contrib-str]])
- (:require [clojure [string :as string] [set :as set]])
- (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout]
- [org.apache.storm.daemon.supervisor SupervisorUtils SyncProcessEvent SupervisorData]
- [java.util ArrayList Arrays HashMap]
- [org.apache.storm.testing.staticmocking MockedSupervisorUtils]
- [org.apache.storm.daemon.supervisor.workermanager DefaultWorkerManager])
- (:import [org.apache.storm.scheduler ISupervisor])
- (:import [org.apache.storm.utils Time Utils$UptimeComputer ConfigUtils])
- (:import [org.apache.storm.generated RebalanceOptions WorkerResources])
- (:import [org.apache.storm.testing.staticmocking MockedCluster])
- (:import [java.util UUID])
- (:import [org.apache.storm Thrift])
- (:import [org.mockito Mockito Matchers])
- (:import [org.mockito.exceptions.base MockitoAssertionError])
- (:import [java.io File])
- (:import [java.nio.file Files])
- (:import [org.apache.storm.utils Utils IPredicate])
- (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils]
- [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller])
- (:import [java.nio.file.attribute FileAttribute] (org.apache.storm.topology TopologyBuilder))
- (:import [org.apache.storm.daemon StormCommon])
- (:use [org.apache.storm config testing util log converter])
- (:use [org.apache.storm.daemon common])
- (:require [org.apache.storm.daemon [worker :as worker] [local-supervisor :as local-supervisor]])
- (:use [conjure core])
- (:require [clojure.java.io :as io]))
-
-(defn worker-assignment
- "Return [storm-id executors]"
- [cluster supervisor-id port]
- (let [state (:storm-cluster-state cluster)
- slot-assigns (for [storm-id (.assignments state nil)]
- (let [executors (-> (clojurify-assignment (.assignmentInfo state storm-id nil))
- :executor->node+port
- (Utils/reverseMap)
- clojurify-structure
- (get [supervisor-id port] ))]
- (when executors [storm-id executors])
- ))
- pred (reify IPredicate (test [this x] (not-nil? x)))
- ret (Utils/findOne pred slot-assigns)]
- (when-not ret
- (throw (RuntimeException. "Could not find assignment for worker")))
- ret
- ))
-
-(defn heartbeat-worker [supervisor port storm-id executors]
- (let [conf (.getConf supervisor)]
- (worker/do-heartbeat {:conf conf
- :port port
- :storm-id storm-id
- :executors executors
- :worker-id (find-worker-id conf port)})))
-
-(defn heartbeat-workers [cluster supervisor-id ports]
- (let [sup (get-supervisor cluster supervisor-id)]
- (doseq [p ports]
- (let [[storm-id executors] (worker-assignment cluster supervisor-id p)]
- (heartbeat-worker sup p storm-id executors)
- ))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn validate-launched-once [launched supervisor->ports storm-id]
- (let [counts (map count (vals launched))
- launched-supervisor->ports (apply merge-with set/union
- (for [[[s p] sids] launched
- :when (some #(= % storm-id) sids)]
- {s #{p}}))
- supervisor->ports (map-val set supervisor->ports)]
- (is (every? (partial = 1) counts))
- (is (= launched-supervisor->ports supervisor->ports))
- ))
-
-(defmacro letlocals
- [& body]
- (let [[tobind lexpr] (split-at (dec (count body)) body)
- binded (vec (mapcat (fn [e]
- (if (and (list? e) (= 'bind (first e)))
- [(second e) (last e)]
- ['_ e]
- ))
- tobind))]
- `(let ~binded
- ~(first lexpr))))
-
-(deftest launches-assignment
- (with-simulated-time-local-cluster [cluster :supervisors 0
- :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
- SUPERVISOR-WORKER-START-TIMEOUT-SECS 5
- SUPERVISOR-WORKER-TIMEOUT-SECS 15
- SUPERVISOR-MONITOR-FREQUENCY-SECS 3}]
- (letlocals
- (bind topology (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails
- (TestPlannerSpout. true) (Integer. 4))}
- {}))
- (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
- (bind changed (capture-changed-workers
- (submit-mocked-assignment
- (:nimbus cluster)
- (:storm-cluster-state cluster)
- "test"
- {TOPOLOGY-WORKERS 3}
- topology
- {1 "1"
- 2 "1"
- 3 "1"
- 4 "1"}
- {[1 1] ["sup1" 1]
- [2 2] ["sup1" 2]
- [3 3] ["sup1" 3]
- [4 4] ["sup1" 3]}
- {["sup1" 1] [0.0 0.0 0.0]
- ["sup1" 2] [0.0 0.0 0.0]
- ["sup1" 3] [0.0 0.0 0.0]
- })
- ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
- (.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.) (.set_wait_secs 0)))
- (advance-cluster-time cluster 2)
- (heartbeat-workers cluster "sup1" [1 2 3])
- (advance-cluster-time cluster 10)))
- (bind storm-id (StormCommon/getStormId (:storm-cluster-state cluster) "test"))
- (is (empty? (:shutdown changed)))
- (validate-launched-once (:launched changed) {"sup1" [1 2 3]} storm-id)
- (bind changed (capture-changed-workers
- (doseq [i (range 10)]
- (heartbeat-workers cluster "sup1" [1 2 3])
- (advance-cluster-time cluster 10))
- ))
- (is (empty? (:shutdown changed)))
- (is (empty? (:launched changed)))
- (bind changed (capture-changed-workers
- (heartbeat-workers cluster "sup1" [1 2])
- (advance-cluster-time cluster 10)
- ))
- (validate-launched-once (:launched changed) {"sup1" [3]} storm-id)
- (is (= {["sup1" 3] 1} (:shutdown changed)))
- )))
-
-(deftest test-multiple-active-storms-multiple-supervisors
- (with-simulated-time-local-cluster [cluster :supervisors 0
- :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
- SUPERVISOR-WORKER-START-TIMEOUT-SECS 5
- SUPERVISOR-WORKER-TIMEOUT-SECS 15
- SUPERVISOR-MONITOR-FREQUENCY-SECS 3}]
- (letlocals
- (bind topology (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails
- (TestPlannerSpout. true) (Integer. 4))}
- {}))
- (bind topology2 (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails
- (TestPlannerSpout. true) (Integer. 3))}
- {}))
- (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
- (bind sup2 (add-supervisor cluster :id "sup2" :ports [1 2]))
- (bind changed (capture-changed-workers
- (submit-mocked-assignment
- (:nimbus cluster)
- (:storm-cluster-state cluster)
- "test"
- {TOPOLOGY-WORKERS 3 TOPOLOGY-MESSAGE-TIMEOUT-SECS 40}
- topology
- {1 "1"
- 2 "1"
- 3 "1"
- 4 "1"}
- {[1 1] ["sup1" 1]
- [2 2] ["sup1" 2]
- [3 3] ["sup2" 1]
- [4 4] ["sup2" 1]}
- {["sup1" 1] [0.0 0.0 0.0]
- ["sup1" 2] [0.0 0.0 0.0]
- ["sup2" 1] [0.0 0.0 0.0]
- })
- ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
- (.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.) (.set_wait_secs 0)))
- (advance-cluster-time cluster 2)
- (heartbeat-workers cluster "sup1" [1 2])
- (heartbeat-workers cluster "sup2" [1])
- ))
- (bind storm-id (StormCommon/getStormId (:storm-cluster-state cluster) "test"))
- (is (empty? (:shutdown changed)))
- (validate-launched-once (:launched changed) {"sup1" [1 2] "sup2" [1]} storm-id)
- (bind changed (capture-changed-workers
- (submit-mocked-assignment
- (:nimbus cluster)
- (:storm-cluster-state cluster)
- "test2"
- {TOPOLOGY-WORKERS 2}
- topology2
- {1 "1"
- 2 "1"
- 3 "1"}
- {[1 1] ["sup1" 3]
- [2 2] ["sup1" 3]
- [3 3] ["sup2" 2]}
- {["sup1" 3] [0.0 0.0 0.0]
- ["sup2" 2] [0.0 0.0 0.0]
- })
- ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
- (.rebalance (:nimbus cluster) "test2" (doto (RebalanceOptions.) (.set_wait_secs 0)))
- (advance-cluster-time cluster 2)
- (heartbeat-workers cluster "sup1" [3])
- (heartbeat-workers cluster "sup2" [2])
- ))
- (bind storm-id2 (StormCommon/getStormId (:storm-cluster-state cluster) "test2"))
- (is (empty? (:shutdown changed)))
- (validate-launched-once (:launched changed) {"sup1" [3] "sup2" [2]} storm-id2)
- (bind changed (capture-changed-workers
- (.killTopology (:nimbus cluster) "test")
- (doseq [i (range 4)]
- (advance-cluster-time cluster 8)
- (heartbeat-workers cluster "sup1" [1 2 3])
- (heartbeat-workers cluster "sup2" [1 2])
- )))
- (is (empty? (:shutdown changed)))
- (is (empty? (:launched changed)))
- (bind changed (capture-changed-workers
- (advance-cluster-time cluster 12)
- ))
- (is (empty? (:launched changed)))
- (is (= {["sup1" 1] 1 ["sup1" 2] 1 ["sup2" 1] 1} (:shutdown changed)))
- (bind changed (capture-changed-workers
- (doseq [i (range 10)]
- (heartbeat-workers cluster "sup1" [3])
- (heartbeat-workers cluster "sup2" [2])
- (advance-cluster-time cluster 10)
- )))
- (is (empty? (:shutdown changed)))
- (is (empty? (:launched changed)))
- ;; TODO check that downloaded code is cleaned up only for the one storm
- )))
-
-(defn get-heartbeat [cluster supervisor-id]
- (clojurify-supervisor-info (.supervisorInfo (:storm-cluster-state cluster) supervisor-id)))
-
-(defn check-heartbeat [cluster supervisor-id within-secs]
- (let [hb (get-heartbeat cluster supervisor-id)
- time-secs (:time-secs hb)
- now (Time/currentTimeSecs)
- delta (- now time-secs)]
- (is (>= delta 0))
- (is (<= delta within-secs))
- ))
-
-(deftest heartbeats-to-nimbus
- (with-simulated-time-local-cluster [cluster :supervisors 0
- :daemon-conf {SUPERVISOR-WORKER-START-TIMEOUT-SECS 15
- SUPERVISOR-HEARTBEAT-FREQUENCY-SECS 3}]
- (letlocals
- (bind sup1 (add-supervisor cluster :id "sup" :ports [5 6 7]))
- (advance-cluster-time cluster 4)
- (bind hb (get-heartbeat cluster "sup"))
- (is (= #{5 6 7} (set (:meta hb))))
- (check-heartbeat cluster "sup" 3)
- (advance-cluster-time cluster 3)
- (check-heartbeat cluster "sup" 3)
- (advance-cluster-time cluster 3)
- (check-heartbeat cluster "sup" 3)
- (advance-cluster-time cluster 15)
- (check-heartbeat cluster "sup" 3)
- (bind topology (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails
- (TestPlannerSpout. true) (Integer. 4))}
- {}))
- ;; prevent them from launching by capturing them
- (capture-changed-workers
- (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology)
- (advance-cluster-time cluster 3)
- (check-heartbeat cluster "sup" 3)
- (advance-cluster-time cluster 3)
- (check-heartbeat cluster "sup" 3)
- (advance-cluster-time cluster 3)
- (check-heartbeat cluster "sup" 3)
- (advance-cluster-time cluster 20)
- (check-heartbeat cluster "sup" 3))
- )))
-
-(deftest test-worker-launch-command
- (testing "*.worker.childopts configuration"
- (let [mock-port 42
- mock-storm-id "fake-storm-id"
- mock-worker-id "fake-worker-id"
- mock-cp (str Utils/FILE_PATH_SEPARATOR "base" Utils/CLASS_PATH_SEPARATOR Utils/FILE_PATH_SEPARATOR "stormjar.jar")
- mock-sensitivity "S3"
- builder (TopologyBuilder.)
- \u2028_ (.setSpout builder "wordSpout" (TestWordSpout.) 1)
- \u2028_ (.shuffleGrouping (.setBolt builder "wordCountBolt" (TestWordCounter.) 1) "wordSpout")
- \u2028mock-storm-topology (.createTopology builder)
- exp-args-fn (fn [opts topo-opts classpath]
- (let [file-prefix (let [os (System/getProperty "os.name")]
- (if (.startsWith os "Windows") (str "file:///")
- (str "")))
- sequences (concat [(SupervisorUtils/javaCmd "java") "-cp" classpath
- (str "-Dlogfile.name=" "worker.log")
- "-Dstorm.home="
- (str "-Dworkers.artifacts=" "/tmp/workers-artifacts")
- (str "-Dstorm.id=" mock-storm-id)
- (str "-Dworker.id=" mock-worker-id)
- (str "-Dworker.port=" mock-port)
- (str "-Dstorm.log.dir=" (ConfigUtils/getLogDir))
- (str "-Dlog4j.configurationFile=" file-prefix Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml")
- "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
- "org.apache.storm.LogWriter"]
- [(SupervisorUtils/javaCmd "java") "-server"]
- opts
- topo-opts
- ["-Djava.library.path="
- (str "-Dlogfile.name=" "worker.log")
- "-Dstorm.home="
- "-Dworkers.artifacts=/tmp/workers-artifacts"
- "-Dstorm.conf.file="
- "-Dstorm.options="
- (str "-Dstorm.log.dir=" (ConfigUtils/getLogDir))
- (str "-Djava.io.tmpdir=/tmp/workers" Utils/FILE_PATH_SEPARATOR mock-worker-id Utils/FILE_PATH_SEPARATOR "tmp")
- (str "-Dlogging.sensitivity=" mock-sensitivity)
- (str "-Dlog4j.configurationFile=" file-prefix Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml")
- "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
- (str "-Dstorm.id=" mock-storm-id)
- (str "-Dworker.id=" mock-worker-id)
- (str "-Dworker.port=" mock-port)
- "-cp" classpath
- "org.apache.storm.daemon.worker"
- mock-storm-id
- ""
- mock-port
- mock-worker-id])
- ret (ArrayList.)]
- (doseq [val sequences]
- (.add ret (str val)))
- ret))]
- (testing "testing *.worker.childopts as strings with extra spaces"
- (let [string-opts "-Dfoo=bar -Xmx1024m"
- topo-string-opts "-Dkau=aux -Xmx2048m"
- exp-args (exp-args-fn ["-Dfoo=bar" "-Xmx1024m"]
- ["-Dkau=aux" "-Xmx2048m"]
- mock-cp)
- mock-supervisor {STORM-CLUSTER-MODE :distributed
- WORKER-CHILDOPTS string-opts}
- mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
- topo-string-opts}
- utils-spy (->>
- (proxy [Utils] []
- (addToClasspathImpl [classpath paths] mock-cp)
- (launchProcessImpl [& _] nil))
- Mockito/spy)
- cu-proxy (proxy [ConfigUtils] []
- (supervisorStormDistRootImpl ([conf] nil)
- ([conf storm-id] nil))
- (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
- (readSupervisorTopologyImpl [conf storm-id] mock-storm-topology)
- (setWorkerUserWSEImpl [conf worker-id user] nil)
- (workerRootImpl [conf] "/tmp/workers")
- (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
- worker-manager (proxy [DefaultWorkerManager] []
- (jlp [stormRoot conf] ""))
- process-proxy (proxy [SyncProcessEvent] []
- (writeLogMetadata [stormconf user workerId stormId port conf] nil)
- (createBlobstoreLinks [conf stormId workerId] nil))]
-
- (with-open [_ (ConfigUtilsInstaller. cu-proxy)
- _ (UtilsInstaller. utils-spy)]
- (.prepareWorker worker-manager mock-supervisor nil)
- (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
- "" mock-storm-id mock-port
- mock-worker-id
- (WorkerResources.) nil)
- (. (Mockito/verify utils-spy)
- (launchProcessImpl (Matchers/eq exp-args)
- (Matchers/any)
- (Matchers/any)
- (Matchers/any)
- (Matchers/any))))))
-
- (testing "testing *.worker.childopts as list of strings, with spaces in values"
- (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
- topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
- exp-args (exp-args-fn list-opts topo-list-opts mock-cp)
- mock-supervisor {STORM-CLUSTER-MODE :distributed
- WORKER-CHILDOPTS list-opts}
- mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
- topo-list-opts}
- cu-proxy (proxy [ConfigUtils] []
- (supervisorStormDistRootImpl ([conf] nil)
- ([conf storm-id] nil))
- (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
- (readSupervisorTopologyImpl [conf storm-id] mock-storm-topology)
- (setWorkerUserWSEImpl [conf worker-id user] nil)
- (workerRootImpl [conf] "/tmp/workers")
- (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
- utils-spy (->>
- (proxy [Utils] []
- (addToClasspathImpl [classpath paths] mock-cp)
- (launchProcessImpl [& _] nil))
- Mockito/spy)
- worker-manager (proxy [DefaultWorkerManager] []
- (jlp [stormRoot conf] ""))
- process-proxy (proxy [SyncProcessEvent] []
- (writeLogMetadata [stormconf user workerId stormId port conf] nil)
- (createBlobstoreLinks [conf stormId workerId] nil))]
- (with-open [_ (ConfigUtilsInstaller. cu-proxy)
- _ (UtilsInstaller. utils-spy)]
- (.prepareWorker worker-manager mock-supervisor nil)
- (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
- "" mock-storm-id
- mock-port
- mock-worker-id
- (WorkerResources.) nil)
- (. (Mockito/verify utils-spy)
- (launchProcessImpl (Matchers/eq exp-args)
- (Matchers/any)
- (Matchers/any)
- (Matchers/any)
- (Matchers/any))))))
-
- (testing "testing topology.classpath is added to classpath"
- (let [topo-cp (str Utils/FILE_PATH_SEPARATOR "any" Utils/FILE_PATH_SEPARATOR "path")
- exp-args (exp-args-fn [] [] (Utils/addToClasspath mock-cp [topo-cp]))
- mock-supervisor {STORM-CLUSTER-MODE :distributed}
- mocked-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
- cu-proxy (proxy [ConfigUtils] []
- (supervisorStormDistRootImpl ([conf] nil)
- ([conf storm-id] nil))
- (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
- (readSupervisorTopologyImpl [conf storm-id] mock-storm-topology)
- (setWorkerUserWSEImpl [conf worker-id user] nil)
- (workerRootImpl [conf] "/tmp/workers")
- (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
- utils-spy (->>
- (proxy [Utils] []
- (currentClasspathImpl []
- (str Utils/FILE_PATH_SEPARATOR "base"))
- (launchProcessImpl [& _] nil))
- Mockito/spy)
- worker-manager (proxy [DefaultWorkerManager] []
- (jlp [stormRoot conf] ""))
- process-proxy (proxy [SyncProcessEvent] []
- (writeLogMetadata [stormconf user workerId stormId port conf] nil)
- (createBlobstoreLinks [conf stormId workerId] nil))]
- (with-open [_ (ConfigUtilsInstaller. cu-proxy)
- _ (UtilsInstaller. utils-spy)]
- (.prepareWorker worker-manager mock-supervisor nil)
- (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
- "" mock-storm-id
- mock-port
- mock-worker-id
- (WorkerResources.) nil)
- (. (Mockito/verify utils-spy)
- (launchProcessImpl (Matchers/eq exp-args)
- (Matchers/any)
- (Matchers/any)
- (Matchers/any)
- (Matchers/any))))))
- (testing "testing topology.environment is added to environment for worker launch"
- (let [topo-env {"THISVAR" "somevalue" "THATVAR" "someothervalue"}
- full-env (merge topo-env {"LD_LIBRARY_PATH" nil})
- exp-args (exp-args-fn [] [] mock-cp)
- mock-supervisor {STORM-CLUSTER-MODE :distributed}
- mocked-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}
- cu-proxy (proxy [ConfigUtils] []
- (supervisorStormDistRootImpl ([conf] nil)
- ([conf storm-id] nil))
- (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
- (readSupervisorTopologyImpl [conf storm-id] mock-storm-topology)
- (setWorkerUserWSEImpl [conf worker-id user] nil)
- (workerRootImpl [conf] "/tmp/workers")
- (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
- utils-spy (->>
- (proxy [Utils] []
- (currentClasspathImpl []
- (str Utils/FILE_PATH_SEPARATOR "base"))
- (launchProcessImpl [& _] nil))
- Mockito/spy)
- worker-manager (proxy [DefaultWorkerManager] []
- (jlp [stormRoot conf] nil))
- process-proxy (proxy [SyncProcessEvent] []
- (writeLogMetadata [stormconf user workerId stormId port conf] nil)
- (createBlobstoreLinks [conf stormId workerId] nil))]
- (with-open [_ (ConfigUtilsInstaller. cu-proxy)
- _ (UtilsInstaller. utils-spy)]
- (.prepareWorker worker-manager mock-supervisor nil)
- (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
- "" mock-storm-id
- mock-port
- mock-worker-id
- (WorkerResources.) nil)
- (. (Mockito/verify utils-spy)
- (launchProcessImpl (Matchers/any)
- (Matchers/eq full-env)
- (Matchers/any)
- (Matchers/any)
- (Matchers/any))))))
-
- (testing "testing addToClasspath(Collection<String>, Collection<String>)"
- (let [firstpart (java.util.ArrayList. ["a" "b" "c"])
- secondpart (java.util.ArrayList. ["d" "e" "f"])
- ^java.util.ArrayList nillist nil]
- (is (= (Utils/addToClasspath firstpart secondpart)
- "a:b:c:d:e:f"))
- (is (= (Utils/addToClasspath [] secondpart)
- "d:e:f"))
- (is (= (Utils/addToClasspath nillist secondpart)
- "d:e:f"))
- (is (= (Utils/addToClasspath firstpart [])
- "a:b:c"))
- (is (= (Utils/addToClasspath firstpart nillist)
- "a:b:c"))
- (is (= (Utils/addToClasspath [] [])
- ""))
- (is (= (Utils/addToClasspath nillist nillist)
- "")))))))
-
-(deftest test-worker-launch-command-run-as-user
- (testing "*.worker.childopts configuration"
- (let [file-prefix (let [os (System/getProperty "os.name")]
- (if (.startsWith os "Windows") (str "file:///")
- (str "")))
- mock-port 42
- mock-storm-id "fake-storm-id"
- mock-worker-id "fake-worker-id"
- mock-sensitivity "S3"
- mock-cp "mock-classpath'quote-on-purpose"
- builder (TopologyBuilder.)
- \u2028_ (.setSpout builder "wordSpout" (TestWordSpout.) 1)
- \u2028_ (.shuffleGrouping (.setBolt builder "wordCountBolt" (TestWordCounter.) 1) "wordSpout")
- mock-storm-topology (.createTopology builder)
- attrs (make-array FileAttribute 0)
- storm-local (.getCanonicalPath (.toFile (Files/createTempDirectory "storm-local" attrs)))
- worker-script (str storm-local Utils/FILE_PATH_SEPARATOR "workers" Utils/FILE_PATH_SEPARATOR mock-worker-id Utils/FILE_PATH_SEPARATOR "storm-worker-script.sh")
- exp-launch ["/bin/worker-launcher"
- "me"
- "worker"
- (str storm-local Utils/FILE_PATH_SEPARATOR "workers" Utils/FILE_PATH_SEPARATOR mock-worker-id)
- worker-script]
- exp-script-fn (fn [opts topo-opts]
- (str "#!/bin/bash\n'export' 'LD_LIBRARY_PATH=';\n\nexec 'java'"
- " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
- " '-Dlogfile.name=" "worker.log'"
- " '-Dstorm.home='"
- " '-Dworkers.artifacts=" (str storm-local "/workers-artifacts'")
- " '-Dstorm.id=" mock-storm-id "'"
- " '-Dworker.id=" mock-worker-id "'"
- " '-Dworker.port=" mock-port "'"
- " '-Dstorm.log.dir=" (ConfigUtils/getLogDir) "'"
- " '-Dlog4j.configurationFile=" (str file-prefix Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml'")
- " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
- " 'org.apache.storm.LogWriter'"
- " 'java' '-server'"
- " " (Utils/shellCmd opts)
- " " (Utils/shellCmd topo-opts)
- " '-Djava.library.path='"
- " '-Dlogfile.name=" "worker.log'"
- " '-Dstorm.home='"
- " '-Dworkers.artifacts=" (str storm-local "/workers-artifacts'")
- " '-Dstorm.conf.file='"
- " '-Dstorm.options='"
- " '-Dstorm.log.dir=" (ConfigUtils/getLogDir) "'"
- " '-Djava.io.tmpdir=" (str storm-local "/workers/" mock-worker-id "/tmp'")
- " '-Dlogging.sensitivity=" mock-sensitivity "'"
- " '-Dlog4j.configurationFile=" (str file-prefix Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml'")
- " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
- " '-Dstorm.id=" mock-storm-id "'"
- " '-Dworker.id=" mock-worker-id "'"
- " '-Dworker.port=" mock-port "'"
- " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
- " 'org.apache.storm.daemon.worker'"
- " '" mock-storm-id "'"
- " '""'"
- " '" mock-port "'"
- " '" mock-worker-id "';"))]
- (try
- (testing "testing *.worker.childopts as strings with extra spaces"
- (let [string-opts "-Dfoo=bar -Xmx1024m"
- topo-string-opts "-Dkau=aux -Xmx2048m"
- exp-script (exp-script-fn ["-Dfoo=bar" "-Xmx1024m"]
- ["-Dkau=aux" "-Xmx2048m"])
- _ (.mkdirs (io/file storm-local "workers" mock-worker-id))
- mock-supervisor {STORM-CLUSTER-MODE :distributed
- STORM-LOCAL-DIR storm-local
- STORM-WORKERS-ARTIFACTS-DIR (str storm-local "/workers-artifacts")
- SUPERVISOR-RUN-WORKER-AS-USER true
- WORKER-CHILDOPTS string-opts}
- mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
- topo-string-opts
- TOPOLOGY-SUBMITTER-USER "me"}
- cu-proxy (proxy [ConfigUtils] []
- (supervisorStormDistRootImpl ([conf] nil)
- ([conf storm-id] nil))
- (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
- (readSupervisorTopologyImpl [conf storm-id] mock-storm-topology)
- (setWorkerUserWSEImpl [conf worker-id user] nil))
- utils-spy (->>
- (proxy [Utils] []
- (addToClasspathImpl [classpath paths] mock-cp)
- (launchProcessImpl [& _] nil))
- Mockito/spy)
- supervisor-utils (Mockito/mock SupervisorUtils)
- worker-manager (proxy [DefaultWorkerManager] []
- (jlp [stormRoot conf] ""))
- process-proxy (proxy [SyncProcessEvent] []
- (writeLogMetadata [stormconf user workerId stormId port conf] nil))]
- (with-open [_ (ConfigUtilsInstaller. cu-proxy)
- _ (UtilsInstaller. utils-spy)
- _ (MockedSupervisorUtils. supervisor-utils)]
- (. (Mockito/when (.javaCmdImpl supervisor-utils (Mockito/any))) (thenReturn (str "java")))
- (.prepareWorker worker-manager mock-supervisor nil)
- (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
- "" mock-storm-id
- mock-port
- mock-worker-id
- (WorkerResources.) nil)
- (. (Mockito/verify utils-spy)
- (launchProcessImpl (Matchers/eq exp-launch)
- (Matchers/any)
- (Matchers/any)
- (Matchers/any)
- (Matchers/any))))
- (is (= (slurp worker-script) exp-script))
- ))
- (finally (Utils/forceDelete storm-local)))
- (.mkdirs (io/file storm-local "workers" mock-worker-id))
- (try
- (testing "testing *.worker.childopts as list of strings, with spaces in values"
- (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
- topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
- exp-script (exp-script-fn list-opts topo-list-opts)
- mock-supervisor {STORM-CLUSTER-MODE :distributed
- STORM-LOCAL-DIR storm-local
- STORM-WORKERS-ARTIFACTS-DIR (str storm-local "/workers-artifacts")
- SUPERVISOR-RUN-WORKER-AS-USER true
- WORKER-CHILDOPTS list-opts}
- mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
- topo-list-opts
- TOPOLOGY-SUBMITTER-USER "me"}
- cu-proxy (proxy [ConfigUtils] []
- (supervisorStormDistRootImpl ([conf] nil)
- ([conf storm-id] nil))
- (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
- (readSupervisorTopologyImpl [conf storm-id] mock-storm-topology)
- (setWorkerUserWSEImpl [conf worker-id user] nil))
- utils-spy (->>
- (proxy [Utils] []
- (addToClasspathImpl [classpath paths] mock-cp)
- (launchProcessImpl [& _] nil))
- Mockito/spy)
- supervisor-utils (Mockito/mock SupervisorUtils)
- worker-manager (proxy [DefaultWorkerManager] []
- (jlp [stormRoot conf] ""))
- process-proxy (proxy [SyncProcessEvent] []
- (writeLogMetadata [stormconf user workerId stormId port conf] nil))]
- (with-open [_ (ConfigUtilsInstaller. cu-proxy)
- _ (UtilsInstaller. utils-spy)
- _ (MockedSupervisorUtils. supervisor-utils)]
- (. (Mockito/when (.javaCmdImpl supervisor-utils (Mockito/any))) (thenReturn (str "java")))
- (.prepareWorker worker-manager mock-supervisor nil)
- (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
- "" mock-storm-id
- mock-port
- mock-worker-id
- (WorkerResources.) nil)
- (. (Mockito/verify utils-spy)
- (launchProcessImpl (Matchers/eq exp-launch)
- (Matchers/any)
- (Matchers/any)
- (Matchers/any)
- (Matchers/any))))
- (is (= (slurp worker-script) exp-script))
- ))
- (finally (Utils/forceDelete storm-local))))))
-
-(deftest test-workers-go-bananas
- ;; test that multiple workers are started for a port, and test that
- ;; supervisor shuts down propertly (doesn't shutdown the most
- ;; recently launched one, checks heartbeats correctly, etc.)
- )
-
-(deftest downloads-code
- )
-
-(deftest test-stateless
- )
-
-(deftest cleans-up-on-unassign
- ;; TODO just do reassign, and check that cleans up worker states after killing but doesn't get rid of downloaded code
- )
-
-(deftest test-supervisor-data-acls
- (testing "supervisor-data uses correct ACLs"
- (let [scheme "digest"
- digest "storm:thisisapoorpassword"
- auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme
- STORM-ZOOKEEPER-AUTH-PAYLOAD digest
- STORM-SUPERVISOR-WORKER-MANAGER-PLUGIN "org.apache.storm.daemon.supervisor.workermanager.DefaultWorkerManager"}
- expected-acls (SupervisorUtils/supervisorZkAcls)
- fake-isupervisor (reify ISupervisor
- (getSupervisorId [this] nil)
- (getAssignmentId [this] nil))
- fake-cu (proxy [ConfigUtils] []
- (supervisorStateImpl [conf] nil)
- (supervisorLocalDirImpl [conf] nil))
- fake-utils (proxy [Utils] []
- (localHostnameImpl [] nil)
- (makeUptimeComputer [] (proxy [Utils$UptimeComputer] []
- (upTime [] 0))))
- cluster-utils (Mockito/mock ClusterUtils)]
- (with-open [_ (ConfigUtilsInstaller. fake-cu)
- _ (UtilsInstaller. fake-utils)
- mocked-cluster (MockedCluster. cluster-utils)]
- (SupervisorData. auth-conf nil fake-isupervisor)
- (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))))))
-
- (deftest test-write-log-metadata
- (testing "supervisor writes correct data to logs metadata file"
- (let [exp-owner "alice"
- exp-worker-id "42"
- exp-storm-id "0123456789"
- exp-port 4242
- exp-logs-users ["bob" "charlie" "daryl"]
- exp-logs-groups ["read-only-group" "special-group"]
- storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
- TOPOLOGY-USERS ["charlie" "bob"]
- TOPOLOGY-GROUPS ["special-group"]
- LOGS-GROUPS ["read-only-group"]
- LOGS-USERS ["daryl"]}
- exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
- "worker-id" exp-worker-id
- LOGS-USERS exp-logs-users
- LOGS-GROUPS exp-logs-groups}
- conf {}
- process-proxy (->> (proxy [SyncProcessEvent] []
- (writeLogMetadataToYamlFile [stormId port data conf] nil))
- Mockito/spy)]
- (.writeLogMetadata process-proxy storm-conf exp-owner exp-worker-id
- exp-storm-id exp-port conf)
- (.writeLogMetadataToYamlFile (Mockito/verify process-proxy (Mockito/times 1)) (Mockito/eq exp-storm-id) (Mockito/eq exp-port) (Mockito/any) (Mockito/eq conf)))))
-
- (deftest test-worker-launcher-requires-user
- (testing "worker-launcher throws on blank user"
- (let [utils-proxy (proxy [Utils] []
- (launchProcessImpl [& _] nil))]
- (with-open [_ (UtilsInstaller. utils-proxy)]
- (is (try
- (SupervisorUtils/processLauncher {} nil nil (ArrayList.) {} nil nil nil)
- false
- (catch Throwable t
- (and (re-matches #"(?i).*user cannot be blank.*" (.getMessage t))
- (Utils/exceptionCauseIsInstanceOf java.lang.IllegalArgumentException t)))))))))
-
- (defn found? [sub-str input-str]
- (if (string? input-str)
- (contrib-str/substring? sub-str (str input-str))
- (boolean (some #(contrib-str/substring? sub-str %) input-str))))
-
- (defn not-found? [sub-str input-str]
- (complement (found? sub-str input-str)))
-
- (deftest test-substitute-childopts-happy-path-string
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap (int 512)
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- worker-manager (DefaultWorkerManager.)
- childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-happy-path-list
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap (int 512)
- childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- worker-manager (DefaultWorkerManager.)
- childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-happy-path-list-arraylist
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap (int 512)
- childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- worker-manager (DefaultWorkerManager.)
- childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-topology-id-alone
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap (int 512)
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
- worker-manager (DefaultWorkerManager.)
- childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-no-keys
- (testing "worker-launcher has no ids to replace in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap (int 512)
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
- worker-manager (DefaultWorkerManager.)
- childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-nil-childopts
- (testing "worker-launcher has nil childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap (int 512)
- childopts nil
- expected-childopts '[]
- worker-manager (DefaultWorkerManager.)
- childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-nil-ids
- (testing "worker-launcher has nil ids"
- (let [worker-id ""
- topology-id "s-01"
- port 9999
- mem-onheap (int 512)
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
- worker-manager (DefaultWorkerManager.)
- childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-retry-read-assignments
- (with-simulated-time-local-cluster [cluster
- :supervisors 0
- :ports-per-supervisor 2
- :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
- NIMBUS-MONITOR-FREQ-SECS 10
- TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
- TOPOLOGY-ACKER-EXECUTORS 0}]
- (letlocals
- (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
- (bind topology1 (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails
- (TestPlannerSpout. true) (Integer. 2))}
- {}))
- (bind topology2 (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails
- (TestPlannerSpout. true) (Integer. 2))}
- {}))
- (bind state (:storm-cluster-state cluster))
- (bind changed (capture-changed-workers
- (submit-mocked-assignment
- (:nimbus cluster)
- (:storm-cluster-state cluster)
- "topology1"
- {TOPOLOGY-WORKERS 2}
- topology1
- {1 "1"
- 2 "1"}
- {[1 1] ["sup1" 1]
- [2 2] ["sup1" 2]}
- {["sup1" 1] [0.0 0.0 0.0]
- ["sup1" 2] [0.0 0.0 0.0]
- })
- (submit-mocked-assignment
- (:nimbus cluster)
- (:storm-cluster-state cluster)
- "topology2"
- {TOPOLOGY-WORKERS 2}
- topology2
- {1 "1"
- 2 "1"}
- {[1 1] ["sup1" 1]
- [2 2] ["sup1" 2]}
- {["sup1" 1] [0.0 0.0 0.0]
- ["sup1" 2] [0.0 0.0 0.0]
- })
- ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
- (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
- ))
- (is (empty? (:launched changed)))
- (bind options (RebalanceOptions.))
- (.set_wait_secs options 0)
- (bind changed (capture-changed-workers
- (.rebalance (:nimbus cluster) "topology2" options)
- (advance-cluster-time cluster 10)
- (heartbeat-workers cluster "sup1" [1 2 3 4])
- (advance-cluster-time cluster 10)
- ))
- (validate-launched-once (:launched changed)
- {"sup1" [1 2]}
- (StormCommon/getStormId (:storm-cluster-state cluster) "topology1"))
- (validate-launched-once (:launched changed)
- {"sup1" [3 4]}
- (StormCommon/getStormId (:storm-cluster-state cluster) "topology2"))
- )))
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/jvm/org/apache/storm/TestCgroups.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/TestCgroups.java b/storm-core/test/jvm/org/apache/storm/TestCgroups.java
index 0857ba9..d732c36 100644
--- a/storm-core/test/jvm/org/apache/storm/TestCgroups.java
+++ b/storm-core/test/jvm/org/apache/storm/TestCgroups.java
@@ -31,7 +31,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -61,7 +60,7 @@ public class TestCgroups {
CgroupManager manager = new CgroupManager();
manager.prepare(config);
- Map<String, Object> resourcesMap = new HashMap<String, Object>();
+ Map<String, Number> resourcesMap = new HashMap<>();
resourcesMap.put("cpu", 200);
resourcesMap.put("memory", 1024);
String workerId = UUID.randomUUID().toString();