You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by go...@apache.org on 2019/10/23 16:14:39 UTC
[storm] branch 2.1.x-branch updated: STORM-3515: Fixes passing main
arguments from python Storm cli to underlying Java cli
This is an automated email from the ASF dual-hosted git repository.
govind pushed a commit to branch 2.1.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/2.1.x-branch by this push:
new b16825b STORM-3515: Fixes passing main arguments from python Storm cli to underlying Java cli
b16825b is described below
commit b16825b5f8ff64d2afc38ee0779bc28cf2fc3c3f
Author: Govind Menon <go...@gmail.com>
AuthorDate: Fri Oct 4 16:06:41 2019 -0500
STORM-3515: Fixes passing main arguments from python Storm cli to underlying Java cli
---
bin/storm.py | 174 +++++++++++++++++++++------------
storm-client/test/py/test_storm_cli.py | 51 +++++-----
2 files changed, 135 insertions(+), 90 deletions(-)
diff --git a/bin/storm.py b/bin/storm.py
index 1aa2993..9c45c26 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -81,9 +81,10 @@ def get_java_cmd():
cmd = os.path.join(JAVA_HOME, 'bin', cmd)
return cmd
-def confvalue(name, storm_config_opts, extrapaths, daemon=True):
+def confvalue(name, storm_config_opts, extrapaths, overriding_conf_file=None, daemon=True):
command = [
- JAVA_CMD, "-client", get_config_opts(storm_config_opts), "-Dstorm.conf.file=" + CONF_FILE,
+ JAVA_CMD, "-client", get_config_opts(storm_config_opts),
+ "-Dstorm.conf.file=" + (overriding_conf_file if overriding_conf_file else ""),
"-cp", get_classpath(extrajars=extrapaths, daemon=daemon), "org.apache.storm.command.ConfigValue", name
]
output = subprocess.Popen(command, stdout=subprocess.PIPE).communicate()[0]
@@ -229,9 +230,12 @@ def resolve_dependencies(artifacts, artifact_repositories, maven_local_repos_dir
raise RuntimeError("dependency handler returns non-json response: sysout<%s>", output)
-def exec_storm_class(klass, storm_config_opts, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False, daemon=True, client=False, daemonName=""):
- storm_log_dir = confvalue("storm.log.dir", storm_config_opts=storm_config_opts, extrapaths=[CLUSTER_CONF_DIR])
- if(storm_log_dir == None or storm_log_dir == "null"):
+def exec_storm_class(klass, storm_config_opts, jvmtype="-server", jvmopts=[],
+ extrajars=[], args=[], fork=False, daemon=True, client=False, daemonName="",
+ overriding_conf_file=None):
+ storm_log_dir = confvalue("storm.log.dir", storm_config_opts=storm_config_opts,
+ extrapaths=[CLUSTER_CONF_DIR], overriding_conf_file=overriding_conf_file)
+ if storm_log_dir is None or storm_log_dir in ["null", ""]:
storm_log_dir = os.path.join(STORM_DIR, "logs")
all_args = [
JAVA_CMD, jvmtype,
@@ -240,7 +244,7 @@ def exec_storm_class(klass, storm_config_opts, jvmtype="-server", jvmopts=[], ex
"-Dstorm.home=" + STORM_DIR,
"-Dstorm.log.dir=" + storm_log_dir,
"-Djava.library.path=" + confvalue("java.library.path", storm_config_opts, extrajars, daemon=daemon),
- "-Dstorm.conf.file=" + CONF_FILE,
+ "-Dstorm.conf.file=" + (overriding_conf_file if overriding_conf_file else ""),
"-cp", get_classpath(extrajars, daemon, client=client),
] + jvmopts + [klass] + list(args)
print("Running: " + " ".join(all_args))
@@ -282,15 +286,18 @@ def run_client_jar(klass, args, daemon=False, client=True, extrajvmopts=[]):
daemon=False,
jvmopts=JAR_JVM_OPTS + extrajvmopts + ["-Dstorm.jar=" + jarfile] +
["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
- ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)])
+ ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)],
+ overriding_conf_file=args.config)
def print_localconfvalue(args):
- print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts, [USER_CONF_DIR]))
+ print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts,
+ [USER_CONF_DIR], overriding_conf_file=args.config))
def print_remoteconfvalue(args):
- print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts, [CLUSTER_CONF_DIR]))
+ print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts,
+ [CLUSTER_CONF_DIR], overriding_conf_file=args.config))
def initialize_main_command():
@@ -371,6 +378,14 @@ def add_common_options(parser, main_args=True):
nargs='*', help="Runs the main method with the specified arguments."
)
+def remove_common_options(sys_args):
+ flags_to_filter = ["-c", "-storm_config_opts", "--config"]
+ filtered_sys_args = [
+ sys_args[i] for i in range(0, len(sys_args)) if (not (sys_args[i] in flags_to_filter) and ((i<1) or
+ not (sys_args[i - 1] in flags_to_filter)))
+ ]
+ return filtered_sys_args
+
def add_topology_jar_options(parser):
parser.add_argument(
"topology_jar_path", metavar="topology-jar-path",
@@ -1094,25 +1109,28 @@ def sql(args):
args=sql_args,
daemon=False,
jvmopts=["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
- ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)])
+ ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)],
+ overriding_conf_file=args.config)
def kill(args):
exec_storm_class(
"org.apache.storm.command.KillTopology",
- args=args.main_args, storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def upload_credentials(args):
- if (len(args.cred_list) %2 != 0):
+ if (len(args.cred_list) % 2 != 0):
raise argparse.ArgumentTypeError("please provide a list of cred key and value pairs " + cred_list)
exec_storm_class(
"org.apache.storm.command.UploadCredentials",
- args=args.main_args, storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def blob(args):
@@ -1120,32 +1138,36 @@ def blob(args):
raise argparse.ArgumentTypeError("Replication factor needed when doing blob update")
exec_storm_class(
"org.apache.storm.command.Blobstore",
- args=args.main_args, storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def heartbeats(args):
exec_storm_class(
"org.apache.storm.command.Heartbeats",
- args=args.main_args, storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def activate(args):
exec_storm_class(
"org.apache.storm.command.Activate",
- args=args.main_args, storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def listtopos(args):
exec_storm_class(
"org.apache.storm.command.ListTopologies",
- args=args.main_args, storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def set_log_level(args):
for log_level in args.l:
@@ -1158,16 +1180,18 @@ def set_log_level(args):
raise argparse.ArgumentTypeError("Should be in the form[logger name]=[log level][:optional timeout]")
exec_storm_class(
"org.apache.storm.command.SetLogLevel",
- args=args.main_args, storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def deactivate(args):
exec_storm_class(
"org.apache.storm.command.Deactivate",
- args=args.main_args, storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def rebalance(args):
@@ -1181,41 +1205,46 @@ def rebalance(args):
raise argparse.ArgumentTypeError("Should be in the form component_name:new_executor_count")
exec_storm_class(
"org.apache.storm.command.Rebalance",
- args=args.main_args, storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def get_errors(args):
exec_storm_class(
"org.apache.storm.command.GetErrors",
- args=args.main_args, storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def healthcheck(args):
exec_storm_class(
"org.apache.storm.command.HealthCheck",
- args=args.main_args, storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def kill_workers(args):
exec_storm_class(
"org.apache.storm.command.KillWorkers",
- args=args.main_args, storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def admin(args):
exec_storm_class(
"org.apache.storm.command.AdminCommands",
- args=args.main_args, storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def shell(args):
@@ -1228,21 +1257,24 @@ def shell(args):
args=runnerargs,
jvmtype="-client",
extrajars=[USER_CONF_DIR],
- fork=True)
+ fork=True,
+ overriding_conf_file=args.config)
os.system("rm " + tmpjarpath)
def repl(args):
cppaths = [CLUSTER_CONF_DIR]
exec_storm_class(
- "clojure.main", storm_config_opts=args.storm_config_opts, jvmtype="-client", extrajars=cppaths
+ "clojure.main", storm_config_opts=args.storm_config_opts, jvmtype="-client", extrajars=cppaths,
+ overriding_conf_file=args.config
)
-def get_log4j2_conf_dir(storm_config_opts):
+def get_log4j2_conf_dir(storm_config_opts, args):
cppaths = [CLUSTER_CONF_DIR]
storm_log4j2_conf_dir = confvalue(
- "storm.log4j2.conf.dir", storm_config_opts=storm_config_opts, extrapaths=cppaths
+ "storm.log4j2.conf.dir", storm_config_opts=storm_config_opts,
+ extrapaths=cppaths, overriding_conf_file=args.config
)
if(not storm_log4j2_conf_dir or storm_log4j2_conf_dir == "null"):
storm_log4j2_conf_dir = STORM_LOG4J2_CONF_DIR
@@ -1255,18 +1287,19 @@ def nimbus(args):
cppaths = [CLUSTER_CONF_DIR]
storm_config_opts = get_config_opts(args.storm_config_opts)
jvmopts = shlex.split(confvalue(
- "nimbus.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths
+ "nimbus.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths, overriding_conf_file=args.config
)) + [
"-Djava.deserialization.disabled=true",
"-Dlogfile.name=nimbus.log",
- "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml"),
+ "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml"),
]
exec_storm_class(
"org.apache.storm.daemon.nimbus.Nimbus", storm_config_opts=args.storm_config_opts,
jvmtype="-server",
daemonName="nimbus",
extrajars=cppaths,
- jvmopts=jvmopts)
+ jvmopts=jvmopts,
+ overriding_conf_file=args.config)
def pacemaker(args):
@@ -1274,47 +1307,51 @@ def pacemaker(args):
storm_config_opts = get_config_opts(args.storm_config_opts)
jvmopts = shlex.split(confvalue(
- "pacemaker.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths)
+ "pacemaker.childopts", storm_config_opts=storm_config_opts,
+ extrapaths=cppaths, overriding_conf_file=args.config)
) + [
"-Djava.deserialization.disabled=true",
"-Dlogfile.name=pacemaker.log",
- "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml"),
+ "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml"),
]
exec_storm_class(
"org.apache.storm.pacemaker.Pacemaker", storm_config_opts=args.storm_config_opts,
jvmtype="-server",
daemonName="pacemaker",
extrajars=cppaths,
- jvmopts=jvmopts)
+ jvmopts=jvmopts,
+ overriding_conf_file=args.config)
def supervisor(args):
cppaths = [CLUSTER_CONF_DIR]
storm_config_opts = get_config_opts(args.storm_config_opts)
jvmopts = shlex.split(confvalue(
- "supervisor.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths)
+ "supervisor.childopts", storm_config_opts=storm_config_opts,
+ extrapaths=cppaths, overriding_conf_file=args.config)
) + [
"-Djava.deserialization.disabled=true",
"-Dlogfile.name=" + STORM_SUPERVISOR_LOG_FILE,
- "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml"),
+ "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml"),
]
exec_storm_class(
"org.apache.storm.daemon.supervisor.Supervisor", storm_config_opts=args.storm_config_opts,
jvmtype="-server",
daemonName="supervisor",
extrajars=cppaths,
- jvmopts=jvmopts)
+ jvmopts=jvmopts,
+ overriding_conf_file=args.config)
def ui(args):
cppaths = [CLUSTER_CONF_DIR]
storm_config_opts = get_config_opts(args.storm_config_opts)
jvmopts = shlex.split(confvalue(
- "ui.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths)
+ "ui.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths, overriding_conf_file=args.config)
) + [
"-Djava.deserialization.disabled=true",
"-Dlogfile.name=ui.log",
- "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml")
+ "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml")
]
allextrajars = get_wildcard_dir(STORM_WEBAPP_LIB_DIR)
@@ -1324,7 +1361,8 @@ def ui(args):
jvmtype="-server",
daemonName="ui",
jvmopts=jvmopts,
- extrajars=allextrajars)
+ extrajars=allextrajars,
+ overriding_conf_file=args.config)
def logviewer(args):
@@ -1332,12 +1370,13 @@ def logviewer(args):
storm_config_opts = get_config_opts(args.storm_config_opts)
jvmopts = shlex.split(
confvalue(
- "logviewer.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths
+ "logviewer.childopts", storm_config_opts=storm_config_opts,
+ extrapaths=cppaths, overriding_conf_file=args.config
)
) + [
"-Djava.deserialization.disabled=true",
"-Dlogfile.name=logviewer.log",
- "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml")
+ "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml")
]
allextrajars = get_wildcard_dir(STORM_WEBAPP_LIB_DIR)
@@ -1347,7 +1386,8 @@ def logviewer(args):
jvmtype="-server",
daemonName="logviewer",
jvmopts=jvmopts,
- extrajars=allextrajars)
+ extrajars=allextrajars,
+ overriding_conf_file=args.config)
def drpc_client(args):
@@ -1361,9 +1401,10 @@ def drpc_client(args):
exec_storm_class(
"org.apache.storm.command.BasicDrpcClient",
- args=sys.argv[2],
+ args=remove_common_options(sys.argv[2:]),
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def drpc(args):
@@ -1371,12 +1412,12 @@ def drpc(args):
storm_config_opts = get_config_opts(args.storm_config_opts)
jvmopts = shlex.split(
confvalue(
- "drpc.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths
+ "drpc.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths, overriding_conf_file=args.config
)
) + [
"-Djava.deserialization.disabled=true",
"-Dlogfile.name=drpc.log",
- "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml")
+ "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml")
]
allextrajars = get_wildcard_dir(STORM_WEBAPP_LIB_DIR)
allextrajars.append(CLUSTER_CONF_DIR)
@@ -1385,28 +1426,31 @@ def drpc(args):
jvmtype="-server",
daemonName="drpc",
jvmopts=jvmopts,
- extrajars=allextrajars)
+ extrajars=allextrajars,
+ overriding_conf_file=args.config)
def dev_zookeeper(args):
storm_config_opts = get_config_opts(args.storm_config_opts)
jvmopts = [
"-Dlogfile.name=dev-zookeeper.log",
- "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml")
+ "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml")
]
exec_storm_class(
"org.apache.storm.command.DevZookeeper", storm_config_opts=args.storm_config_opts,
jvmtype="-server",
daemonName="dev_zookeeper",
jvmopts=jvmopts,
- extrajars=[CLUSTER_CONF_DIR])
+ extrajars=[CLUSTER_CONF_DIR],
+ overriding_conf_file=args.config)
def version(args):
exec_storm_class(
"org.apache.storm.utils.VersionInfo", storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[CLUSTER_CONF_DIR])
+ extrajars=[CLUSTER_CONF_DIR],
+ overriding_conf_file=args.config)
def print_classpath(args):
@@ -1420,7 +1464,7 @@ def print_server_classpath(args):
def monitor(args):
exec_storm_class(
"org.apache.storm.command.Monitor", storm_config_opts=args.storm_config_opts,
- args=sys.argv[2],
+ args=remove_common_options(sys.argv[2:]),
jvmtype="-client",
extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
diff --git a/storm-client/test/py/test_storm_cli.py b/storm-client/test/py/test_storm_cli.py
index 20eb589..8084452 100644
--- a/storm-client/test/py/test_storm_cli.py
+++ b/storm-client/test/py/test_storm_cli.py
@@ -46,7 +46,7 @@ class TestStormCli(TestCase):
with mock.patch.object(sys, "argv", command_invocation):
self.cli_main()
if expected_output not in mock_shell_interface.call_args_list:
- print("Expected:" + str(expected_output))
+ print("Expected:" + str(expected_output))
print("Got:" + str(mock_shell_interface.call_args_list[-1]))
assert expected_output in mock_shell_interface.call_args_list
@@ -60,7 +60,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client','-Ddaemon.name=', '-Dstorm.options=+topology.blobstore.map%3D%27%7B%22key1%22%3A%7B%22localname%22%3A%22blob_file%22%2C+%22uncompress%22%3Afalse%7D%2C%22key2%22%3A%7B%7D%7D%27',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:example/storm-starter/storm-starter-topologies-*.jar:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin:./external/storm-redis/storm-redis-1.1.0.jar:./external/storm-kafka-client/storm-kafka-client-1.1.0.jar"', '-Dstorm.jar=example/storm-starter/storm-starter-topologies-*.jar', '-Dstorm.dependency.jars=./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka-client/storm-kafk [...]
'org.apache.storm.starter.RollingTopWords', 'blobstore-remote2', 'remote'
@@ -100,7 +100,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client','-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:example/storm-starter/storm-starter-topologies-*.jar:' + self.storm_dir +
@@ -121,7 +121,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd,
[self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' +
self.storm_dir +
@@ -138,7 +138,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
'/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.KillTopology', 'doomed_topology'
@@ -147,17 +147,18 @@ class TestStormCli(TestCase):
def test_upload_credentials_command(self):
self.base_test([
- 'storm','upload-credentials', '--config', '/some/other/storm.yaml', '-c', 'test=test', 'my-topology-name', 'appids', 'role.name1,role.name2'
+ 'storm', 'upload-credentials', '--config', '/some/other/storm.yaml', '-c', 'test=test', 'my-topology-name', 'appids', 'role.name1,role.name2'
], self.mock_execvp, mock.call(
self.java_cmd, [
- self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=', '-Djava.library.path=',
- '-Dstorm.conf.file=', '-cp', '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' +
+ self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=test%3Dtest',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
+ '-Djava.library.path=', '-Dstorm.conf.file=/some/other/storm.yaml',
+ '-cp', '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' +
self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' +
self.storm_dir + '/conf:' + self.storm_dir +
'/bin', 'org.apache.storm.command.UploadCredentials',
- 'my-topology-name', 'appids role.name1,role.name"'])
+ 'my-topology-name', 'appids', 'role.name1,role.name2'])
)
def test_blobstore_command(self):
@@ -166,7 +167,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
@@ -181,7 +182,7 @@ class TestStormCli(TestCase):
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
'-Dstorm.home=' + self.storm_dir + '',
- '-Dstorm.log.dir=', '-Djava.library.path=',
+ '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=',
'-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
@@ -194,7 +195,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
@@ -208,7 +209,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
@@ -221,7 +222,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
'/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin',
@@ -235,7 +236,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
'/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir +
@@ -249,7 +250,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
'/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir +
@@ -263,7 +264,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
'/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir +
@@ -277,7 +278,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-server', '-Ddaemon.name=nimbus', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf',
@@ -293,7 +294,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-server', '-Ddaemon.name=supervisor', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf',
@@ -309,7 +310,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-server', '-Ddaemon.name=pacemaker', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf',
@@ -325,7 +326,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-server', '-Ddaemon.name=ui', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir +
@@ -342,7 +343,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-server', '-Ddaemon.name=logviewer', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir +
@@ -359,7 +360,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-server', '-Ddaemon.name=drpc', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir +
@@ -376,7 +377,7 @@ class TestStormCli(TestCase):
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=', '-Djava.library.path=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=',
'-Dstorm.conf.file=', '-cp', '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' +
self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
self.storm_dir + '/bin', 'org.apache.storm.command.HealthCheck'