You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by go...@apache.org on 2019/10/23 16:14:39 UTC

[storm] branch 2.1.x-branch updated: STORM-3515: Fixes passing main arguments from python Storm cli to underlying Java cli

This is an automated email from the ASF dual-hosted git repository.

govind pushed a commit to branch 2.1.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/2.1.x-branch by this push:
     new b16825b  STORM-3515: Fixes passing main arguments from python Storm cli to underlying Java cli
b16825b is described below

commit b16825b5f8ff64d2afc38ee0779bc28cf2fc3c3f
Author: Govind Menon <go...@gmail.com>
AuthorDate: Fri Oct 4 16:06:41 2019 -0500

    STORM-3515: Fixes passing main arguments from python Storm cli to underlying Java cli
---
 bin/storm.py                           | 174 +++++++++++++++++++++------------
 storm-client/test/py/test_storm_cli.py |  51 +++++-----
 2 files changed, 135 insertions(+), 90 deletions(-)

diff --git a/bin/storm.py b/bin/storm.py
index 1aa2993..9c45c26 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -81,9 +81,10 @@ def get_java_cmd():
         cmd = os.path.join(JAVA_HOME, 'bin', cmd)
     return cmd
 
-def confvalue(name, storm_config_opts, extrapaths, daemon=True):
+def confvalue(name, storm_config_opts, extrapaths, overriding_conf_file=None, daemon=True):
     command = [
-        JAVA_CMD, "-client", get_config_opts(storm_config_opts), "-Dstorm.conf.file=" + CONF_FILE,
+        JAVA_CMD, "-client", get_config_opts(storm_config_opts),
+        "-Dstorm.conf.file=" + (overriding_conf_file if overriding_conf_file else ""),
         "-cp", get_classpath(extrajars=extrapaths, daemon=daemon), "org.apache.storm.command.ConfigValue", name
     ]
     output = subprocess.Popen(command, stdout=subprocess.PIPE).communicate()[0]
@@ -229,9 +230,12 @@ def resolve_dependencies(artifacts, artifact_repositories, maven_local_repos_dir
         raise RuntimeError("dependency handler returns non-json response: sysout<%s>", output)
 
 
-def exec_storm_class(klass, storm_config_opts, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False, daemon=True, client=False, daemonName=""):
-    storm_log_dir = confvalue("storm.log.dir", storm_config_opts=storm_config_opts, extrapaths=[CLUSTER_CONF_DIR])
-    if(storm_log_dir == None or storm_log_dir == "null"):
+def exec_storm_class(klass, storm_config_opts, jvmtype="-server", jvmopts=[],
+                     extrajars=[], args=[], fork=False, daemon=True, client=False, daemonName="",
+                     overriding_conf_file=None):
+    storm_log_dir = confvalue("storm.log.dir", storm_config_opts=storm_config_opts,
+                              extrapaths=[CLUSTER_CONF_DIR], overriding_conf_file=overriding_conf_file)
+    if storm_log_dir is None or storm_log_dir in ["null", ""]:
         storm_log_dir = os.path.join(STORM_DIR, "logs")
     all_args = [
         JAVA_CMD, jvmtype,
@@ -240,7 +244,7 @@ def exec_storm_class(klass, storm_config_opts, jvmtype="-server", jvmopts=[], ex
        "-Dstorm.home=" + STORM_DIR,
        "-Dstorm.log.dir=" + storm_log_dir,
        "-Djava.library.path=" + confvalue("java.library.path", storm_config_opts, extrajars, daemon=daemon),
-       "-Dstorm.conf.file=" + CONF_FILE,
+       "-Dstorm.conf.file=" + (overriding_conf_file if overriding_conf_file else ""),
        "-cp", get_classpath(extrajars, daemon, client=client),
     ] + jvmopts + [klass] + list(args)
     print("Running: " + " ".join(all_args))
@@ -282,15 +286,18 @@ def run_client_jar(klass, args, daemon=False, client=True, extrajvmopts=[]):
         daemon=False,
         jvmopts=JAR_JVM_OPTS + extrajvmopts + ["-Dstorm.jar=" + jarfile] +
                 ["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
-                ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)])
+                ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)],
+        overriding_conf_file=args.config)
 
 
 def print_localconfvalue(args):
-    print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts, [USER_CONF_DIR]))
+    print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts,
+                                            [USER_CONF_DIR], overriding_conf_file=args.config))
 
 
 def print_remoteconfvalue(args):
-    print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts, [CLUSTER_CONF_DIR]))
+    print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts,
+                                            [CLUSTER_CONF_DIR], overriding_conf_file=args.config))
 
 
 def initialize_main_command():
@@ -371,6 +378,14 @@ def add_common_options(parser, main_args=True):
             nargs='*', help="Runs the main method with the specified arguments."
         )
 
+def remove_common_options(sys_args):
+    flags_to_filter = ["-c", "-storm_config_opts", "--config"]
+    filtered_sys_args = [
+        sys_args[i] for i in range(0, len(sys_args)) if (not (sys_args[i] in flags_to_filter) and ((i<1) or
+                                         not (sys_args[i - 1] in flags_to_filter)))
+    ]
+    return filtered_sys_args
+
 def add_topology_jar_options(parser):
     parser.add_argument(
         "topology_jar_path", metavar="topology-jar-path",
@@ -1094,25 +1109,28 @@ def sql(args):
         args=sql_args,
         daemon=False,
         jvmopts=["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
-                ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)])
+                ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)],
+        overriding_conf_file=args.config)
 
 
 def kill(args):
     exec_storm_class(
         "org.apache.storm.command.KillTopology",
-        args=args.main_args, storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def upload_credentials(args):
-    if (len(args.cred_list) %2 != 0):
+    if (len(args.cred_list) % 2 != 0):
         raise argparse.ArgumentTypeError("please provide a list of cred key and value pairs " + cred_list)
     exec_storm_class(
         "org.apache.storm.command.UploadCredentials",
-        args=args.main_args, storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def blob(args):
@@ -1120,32 +1138,36 @@ def blob(args):
         raise argparse.ArgumentTypeError("Replication factor needed when doing blob update")
     exec_storm_class(
         "org.apache.storm.command.Blobstore",
-        args=args.main_args, storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def heartbeats(args):
     exec_storm_class(
         "org.apache.storm.command.Heartbeats",
-        args=args.main_args, storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def activate(args):
     exec_storm_class(
         "org.apache.storm.command.Activate",
-        args=args.main_args, storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 def listtopos(args):
     exec_storm_class(
         "org.apache.storm.command.ListTopologies",
-        args=args.main_args, storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 def set_log_level(args):
     for log_level in args.l:
@@ -1158,16 +1180,18 @@ def set_log_level(args):
             raise argparse.ArgumentTypeError("Should be in the form[logger name]=[log level][:optional timeout]")
     exec_storm_class(
         "org.apache.storm.command.SetLogLevel",
-        args=args.main_args, storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 def deactivate(args):
     exec_storm_class(
         "org.apache.storm.command.Deactivate",
-        args=args.main_args, storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def rebalance(args):
@@ -1181,41 +1205,46 @@ def rebalance(args):
             raise argparse.ArgumentTypeError("Should be in the form component_name:new_executor_count")
     exec_storm_class(
         "org.apache.storm.command.Rebalance",
-        args=args.main_args, storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def get_errors(args):
     exec_storm_class(
         "org.apache.storm.command.GetErrors",
-        args=args.main_args, storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def healthcheck(args):
     exec_storm_class(
         "org.apache.storm.command.HealthCheck",
-        args=args.main_args, storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def kill_workers(args):
     exec_storm_class(
         "org.apache.storm.command.KillWorkers",
-        args=args.main_args, storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def admin(args):
     exec_storm_class(
         "org.apache.storm.command.AdminCommands",
-        args=args.main_args, storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def shell(args):
@@ -1228,21 +1257,24 @@ def shell(args):
         args=runnerargs,
         jvmtype="-client",
         extrajars=[USER_CONF_DIR],
-        fork=True)
+        fork=True,
+        overriding_conf_file=args.config)
     os.system("rm " + tmpjarpath)
 
 
 def repl(args):
     cppaths = [CLUSTER_CONF_DIR]
     exec_storm_class(
-        "clojure.main", storm_config_opts=args.storm_config_opts, jvmtype="-client", extrajars=cppaths
+        "clojure.main", storm_config_opts=args.storm_config_opts, jvmtype="-client", extrajars=cppaths,
+        overriding_conf_file=args.config
     )
 
 
-def get_log4j2_conf_dir(storm_config_opts):
+def get_log4j2_conf_dir(storm_config_opts, args):
     cppaths = [CLUSTER_CONF_DIR]
     storm_log4j2_conf_dir = confvalue(
-        "storm.log4j2.conf.dir", storm_config_opts=storm_config_opts, extrapaths=cppaths
+        "storm.log4j2.conf.dir", storm_config_opts=storm_config_opts,
+        extrapaths=cppaths, overriding_conf_file=args.config
     )
     if(not storm_log4j2_conf_dir or storm_log4j2_conf_dir == "null"):
         storm_log4j2_conf_dir = STORM_LOG4J2_CONF_DIR
@@ -1255,18 +1287,19 @@ def nimbus(args):
     cppaths = [CLUSTER_CONF_DIR]
     storm_config_opts = get_config_opts(args.storm_config_opts)
     jvmopts = shlex.split(confvalue(
-        "nimbus.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths
+        "nimbus.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths, overriding_conf_file=args.config
         )) + [
             "-Djava.deserialization.disabled=true",
             "-Dlogfile.name=nimbus.log",
-            "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml"),
+            "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml"),
         ]
     exec_storm_class(
         "org.apache.storm.daemon.nimbus.Nimbus", storm_config_opts=args.storm_config_opts,
         jvmtype="-server",
         daemonName="nimbus",
         extrajars=cppaths,
-        jvmopts=jvmopts)
+        jvmopts=jvmopts,
+        overriding_conf_file=args.config)
 
 
 def pacemaker(args):
@@ -1274,47 +1307,51 @@ def pacemaker(args):
     storm_config_opts = get_config_opts(args.storm_config_opts)
 
     jvmopts = shlex.split(confvalue(
-        "pacemaker.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths)
+        "pacemaker.childopts", storm_config_opts=storm_config_opts,
+        extrapaths=cppaths, overriding_conf_file=args.config)
     ) + [
         "-Djava.deserialization.disabled=true",
         "-Dlogfile.name=pacemaker.log",
-        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml"),
+        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml"),
         ]
     exec_storm_class(
         "org.apache.storm.pacemaker.Pacemaker", storm_config_opts=args.storm_config_opts,
         jvmtype="-server",
         daemonName="pacemaker",
         extrajars=cppaths,
-        jvmopts=jvmopts)
+        jvmopts=jvmopts,
+        overriding_conf_file=args.config)
 
 
 def supervisor(args):
     cppaths = [CLUSTER_CONF_DIR]
     storm_config_opts = get_config_opts(args.storm_config_opts)
     jvmopts = shlex.split(confvalue(
-        "supervisor.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths)
+        "supervisor.childopts", storm_config_opts=storm_config_opts,
+        extrapaths=cppaths, overriding_conf_file=args.config)
     ) + [
         "-Djava.deserialization.disabled=true",
         "-Dlogfile.name=" + STORM_SUPERVISOR_LOG_FILE,
-        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml"),
+        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml"),
         ]
     exec_storm_class(
         "org.apache.storm.daemon.supervisor.Supervisor", storm_config_opts=args.storm_config_opts,
         jvmtype="-server",
         daemonName="supervisor",
         extrajars=cppaths,
-        jvmopts=jvmopts)
+        jvmopts=jvmopts,
+        overriding_conf_file=args.config)
 
 
 def ui(args):
     cppaths = [CLUSTER_CONF_DIR]
     storm_config_opts = get_config_opts(args.storm_config_opts)
     jvmopts = shlex.split(confvalue(
-        "ui.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths)
+        "ui.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths, overriding_conf_file=args.config)
     ) + [
         "-Djava.deserialization.disabled=true",
         "-Dlogfile.name=ui.log",
-        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml")
+        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml")
     ]
 
     allextrajars = get_wildcard_dir(STORM_WEBAPP_LIB_DIR)
@@ -1324,7 +1361,8 @@ def ui(args):
         jvmtype="-server",
         daemonName="ui",
         jvmopts=jvmopts,
-        extrajars=allextrajars)
+        extrajars=allextrajars,
+        overriding_conf_file=args.config)
 
 
 def logviewer(args):
@@ -1332,12 +1370,13 @@ def logviewer(args):
     storm_config_opts = get_config_opts(args.storm_config_opts)
     jvmopts = shlex.split(
         confvalue(
-            "logviewer.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths
+            "logviewer.childopts", storm_config_opts=storm_config_opts,
+            extrapaths=cppaths, overriding_conf_file=args.config
         )
     ) + [
         "-Djava.deserialization.disabled=true",
         "-Dlogfile.name=logviewer.log",
-        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml")
+        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml")
     ]
 
     allextrajars = get_wildcard_dir(STORM_WEBAPP_LIB_DIR)
@@ -1347,7 +1386,8 @@ def logviewer(args):
         jvmtype="-server",
         daemonName="logviewer",
         jvmopts=jvmopts,
-        extrajars=allextrajars)
+        extrajars=allextrajars,
+        overriding_conf_file=args.config)
 
 
 def drpc_client(args):
@@ -1361,9 +1401,10 @@ def drpc_client(args):
 
     exec_storm_class(
         "org.apache.storm.command.BasicDrpcClient",
-        args=sys.argv[2],
+        args=remove_common_options(sys.argv[2:]),
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def drpc(args):
@@ -1371,12 +1412,12 @@ def drpc(args):
     storm_config_opts = get_config_opts(args.storm_config_opts)
     jvmopts = shlex.split(
         confvalue(
-            "drpc.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths
+            "drpc.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths, overriding_conf_file=args.config
         )
     ) + [
         "-Djava.deserialization.disabled=true",
         "-Dlogfile.name=drpc.log",
-        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml")
+        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml")
     ]
     allextrajars = get_wildcard_dir(STORM_WEBAPP_LIB_DIR)
     allextrajars.append(CLUSTER_CONF_DIR)
@@ -1385,28 +1426,31 @@ def drpc(args):
         jvmtype="-server",
         daemonName="drpc",
         jvmopts=jvmopts,
-        extrajars=allextrajars)
+        extrajars=allextrajars,
+        overriding_conf_file=args.config)
 
 
 def dev_zookeeper(args):
     storm_config_opts = get_config_opts(args.storm_config_opts)
     jvmopts = [
         "-Dlogfile.name=dev-zookeeper.log",
-        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml")
+        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml")
     ]
     exec_storm_class(
         "org.apache.storm.command.DevZookeeper", storm_config_opts=args.storm_config_opts,
         jvmtype="-server",
         daemonName="dev_zookeeper",
         jvmopts=jvmopts,
-        extrajars=[CLUSTER_CONF_DIR])
+        extrajars=[CLUSTER_CONF_DIR],
+        overriding_conf_file=args.config)
 
 
 def version(args):
     exec_storm_class(
         "org.apache.storm.utils.VersionInfo", storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[CLUSTER_CONF_DIR])
+        extrajars=[CLUSTER_CONF_DIR],
+        overriding_conf_file=args.config)
 
 
 def print_classpath(args):
@@ -1420,7 +1464,7 @@ def print_server_classpath(args):
 def monitor(args):
     exec_storm_class(
         "org.apache.storm.command.Monitor", storm_config_opts=args.storm_config_opts,
-        args=sys.argv[2],
+        args=remove_common_options(sys.argv[2:]),
         jvmtype="-client",
         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
 
diff --git a/storm-client/test/py/test_storm_cli.py b/storm-client/test/py/test_storm_cli.py
index 20eb589..8084452 100644
--- a/storm-client/test/py/test_storm_cli.py
+++ b/storm-client/test/py/test_storm_cli.py
@@ -46,7 +46,7 @@ class TestStormCli(TestCase):
         with mock.patch.object(sys, "argv", command_invocation):
             self.cli_main()
         if expected_output not in mock_shell_interface.call_args_list:
-            print("Expected:"  + str(expected_output))
+            print("Expected:" + str(expected_output))
             print("Got:" + str(mock_shell_interface.call_args_list[-1]))
         assert expected_output in mock_shell_interface.call_args_list
 
@@ -60,7 +60,7 @@ class TestStormCli(TestCase):
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client','-Ddaemon.name=', '-Dstorm.options=+topology.blobstore.map%3D%27%7B%22key1%22%3A%7B%22localname%22%3A%22blob_file%22%2C+%22uncompress%22%3Afalse%7D%2C%22key2%22%3A%7B%7D%7D%27',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:example/storm-starter/storm-starter-topologies-*.jar:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin:./external/storm-redis/storm-redis-1.1.0.jar:./external/storm-kafka-client/storm-kafka-client-1.1.0.jar"', '-Dstorm.jar=example/storm-starter/storm-starter-topologies-*.jar', '-Dstorm.dependency.jars=./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka-client/storm-kafk [...]
                 'org.apache.storm.starter.RollingTopWords', 'blobstore-remote2', 'remote'
@@ -100,7 +100,7 @@ class TestStormCli(TestCase):
             ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client','-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:example/storm-starter/storm-starter-topologies-*.jar:' + self.storm_dir +
@@ -121,7 +121,7 @@ class TestStormCli(TestCase):
             ], self.mock_execvp, mock.call(
                 self.java_cmd,
                 [self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                 '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                 '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                  '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                  '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' +
                  self.storm_dir +
@@ -138,7 +138,7 @@ class TestStormCli(TestCase):
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
                 '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.KillTopology', 'doomed_topology'
@@ -147,17 +147,18 @@ class TestStormCli(TestCase):
 
     def test_upload_credentials_command(self):
         self.base_test([
-            'storm','upload-credentials', '--config', '/some/other/storm.yaml', '-c', 'test=test', 'my-topology-name', 'appids', 'role.name1,role.name2'
+            'storm', 'upload-credentials', '--config', '/some/other/storm.yaml', '-c', 'test=test', 'my-topology-name', 'appids', 'role.name1,role.name2'
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
-                self.java_cmd,  '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=', '-Djava.library.path=',
-                '-Dstorm.conf.file=', '-cp', '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' +
+                self.java_cmd,  '-client', '-Ddaemon.name=', '-Dstorm.options=test%3Dtest',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
+                '-Djava.library.path=', '-Dstorm.conf.file=/some/other/storm.yaml',
+                '-cp', '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' +
                                              self.storm_dir +
                                              '/extlib:' + self.storm_dir + '/extlib-daemon:' +
                                              self.storm_dir + '/conf:' + self.storm_dir +
                                              '/bin', 'org.apache.storm.command.UploadCredentials',
-                'my-topology-name', 'appids role.name1,role.name"'])
+                'my-topology-name', 'appids', 'role.name1,role.name2'])
         )
 
     def test_blobstore_command(self):
@@ -166,7 +167,7 @@ class TestStormCli(TestCase):
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
@@ -181,7 +182,7 @@ class TestStormCli(TestCase):
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
                 '-Dstorm.home=' + self.storm_dir + '',
-                '-Dstorm.log.dir=', '-Djava.library.path=',
+                '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=',
                 '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
@@ -194,7 +195,7 @@ class TestStormCli(TestCase):
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
@@ -208,7 +209,7 @@ class TestStormCli(TestCase):
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
@@ -221,7 +222,7 @@ class TestStormCli(TestCase):
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
                 '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin',
@@ -235,7 +236,7 @@ class TestStormCli(TestCase):
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
                 '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir +
@@ -249,7 +250,7 @@ class TestStormCli(TestCase):
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
                 '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir +
@@ -263,7 +264,7 @@ class TestStormCli(TestCase):
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
                 '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir +
@@ -277,7 +278,7 @@ class TestStormCli(TestCase):
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-server', '-Ddaemon.name=nimbus', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf',
@@ -293,7 +294,7 @@ class TestStormCli(TestCase):
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-server', '-Ddaemon.name=supervisor', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf',
@@ -309,7 +310,7 @@ class TestStormCli(TestCase):
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-server', '-Ddaemon.name=pacemaker', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf',
@@ -325,7 +326,7 @@ class TestStormCli(TestCase):
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-server', '-Ddaemon.name=ui', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir +
@@ -342,7 +343,7 @@ class TestStormCli(TestCase):
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-server', '-Ddaemon.name=logviewer', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir +
@@ -359,7 +360,7 @@ class TestStormCli(TestCase):
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-server', '-Ddaemon.name=drpc', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir +
@@ -376,7 +377,7 @@ class TestStormCli(TestCase):
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=', '-Djava.library.path=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=',
                 '-Dstorm.conf.file=', '-cp', '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' +
                 self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
                 self.storm_dir + '/bin', 'org.apache.storm.command.HealthCheck'