You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/05 22:20:49 UTC
[3/5] storm git commit: Remove handling stdout
Remove handling stdout
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/26b2f324
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/26b2f324
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/26b2f324
Branch: refs/heads/master
Commit: 26b2f324cbd68aea0839423b4316070d8200f7ae
Parents: 506c242 bb8d48d
Author: lewuathe <le...@me.com>
Authored: Thu Mar 19 23:13:29 2015 +0900
Committer: lewuathe <le...@me.com>
Committed: Thu Mar 19 23:13:29 2015 +0900
----------------------------------------------------------------------
CHANGELOG.md | 21 +
DEVELOPER.md | 7 +
README.markdown | 2 +
SECURITY.md | 59 +-
STORM-UI-REST-API.md | 12 +
bin/storm | 594 +--------
bin/storm.py | 548 ++++++++
conf/defaults.yaml | 4 +-
conf/storm-env.sh | 24 +
dev-tools/test-ns.py | 17 +
docs/documentation/Clojure-DSL.md | 4 +-
docs/documentation/Command-line-client.md | 2 +-
docs/documentation/Common-patterns.md | 6 +-
docs/documentation/Concepts.md | 48 +-
docs/documentation/Configuration.md | 4 +-
docs/documentation/Distributed-RPC.md | 2 +-
.../Guaranteeing-message-processing.md | 6 +-
docs/documentation/Hooks.md | 6 +-
docs/documentation/Local-mode.md | 4 +-
docs/documentation/Powered-By.md | 23 +-
...unning-topologies-on-a-production-cluster.md | 6 +-
.../Serialization-(prior-to-0.6.0).md | 4 +-
docs/documentation/Serialization.md | 2 +-
docs/documentation/Structure-of-the-codebase.md | 8 +-
docs/documentation/Transactional-topologies.md | 18 +-
docs/documentation/Tutorial.md | 8 +-
...nding-the-parallelism-of-a-Storm-topology.md | 16 +-
external/README.md | 18 +
external/storm-jdbc/README.md | 84 +-
.../apache/storm/jdbc/bolt/JdbcInsertBolt.java | 36 +-
.../org/apache/storm/jdbc/common/Column.java | 3 +-
.../apache/storm/jdbc/common/JdbcClient.java | 52 +-
.../jdbc/mapper/SimpleJdbcLookupMapper.java | 2 +-
.../storm/jdbc/trident/state/JdbcState.java | 18 +-
.../storm/jdbc/common/JdbcClientTest.java | 39 +-
.../jdbc/topology/UserPersistanceTopology.java | 2 +-
.../ExponentialBackoffMsgRetryManager.java | 2 +-
.../src/jvm/storm/kafka/bolt/KafkaBolt.java | 7 +-
.../test/storm/kafka/bolt/KafkaBoltTest.java | 2 +-
.../storm/redis/trident/state/RedisState.java | 2 +-
storm-core/pom.xml | 3 +-
storm-core/src/clj/backtype/storm/cluster.clj | 58 +-
storm-core/src/clj/backtype/storm/config.clj | 7 +-
storm-core/src/clj/backtype/storm/converter.clj | 201 +++
.../src/clj/backtype/storm/daemon/common.clj | 10 +-
.../src/clj/backtype/storm/daemon/executor.clj | 4 +-
.../src/clj/backtype/storm/daemon/logviewer.clj | 5 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 120 +-
.../clj/backtype/storm/daemon/supervisor.clj | 12 +-
.../src/clj/backtype/storm/daemon/task.clj | 11 +-
.../src/clj/backtype/storm/daemon/worker.clj | 3 +-
storm-core/src/clj/backtype/storm/stats.clj | 78 +-
storm-core/src/clj/backtype/storm/thrift.clj | 34 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 50 +-
storm-core/src/genthrift.sh | 13 +-
storm-core/src/java_license_header.txt | 17 +
storm-core/src/jvm/backtype/storm/Config.java | 40 +-
.../jvm/backtype/storm/ConfigValidation.java | 8 +-
.../src/jvm/backtype/storm/StormSubmitter.java | 88 +-
.../storm/coordination/BatchBoltExecutor.java | 4 +-
.../backtype/storm/generated/Assignment.java | 983 ++++++++++++++
.../storm/generated/ClusterWorkerHeartbeat.java | 673 ++++++++++
.../backtype/storm/generated/ExecutorStats.java | 105 +-
.../jvm/backtype/storm/generated/NodeInfo.java | 556 ++++++++
.../jvm/backtype/storm/generated/StormBase.java | 1211 ++++++++++++++++++
.../storm/generated/SupervisorInfo.java | 1182 +++++++++++++++++
.../storm/generated/TopologyActionOptions.java | 387 ++++++
.../storm/generated/TopologyStatus.java | 68 +
.../backtype/storm/messaging/netty/Client.java | 10 +-
.../backtype/storm/messaging/netty/Context.java | 33 +-
.../auth/DefaultHttpCredentialsPlugin.java | 19 +-
.../storm/security/auth/ITransportPlugin.java | 4 +-
.../storm/security/auth/ReqContext.java | 28 +-
.../security/auth/SaslTransportPlugin.java | 3 +-
.../security/auth/SimpleTransportPlugin.java | 5 +-
.../storm/security/auth/TBackoffConnect.java | 4 +-
.../storm/security/auth/ThriftClient.java | 12 +-
.../authorizer/ImpersonationAuthorizer.java | 154 +++
.../auth/authorizer/SimpleACLAuthorizer.java | 55 +-
.../auth/digest/DigestSaslTransportPlugin.java | 6 +-
.../auth/digest/ServerCallbackHandler.java | 21 +-
.../kerberos/KerberosSaslTransportPlugin.java | 9 +-
.../auth/kerberos/ServerCallbackHandler.java | 38 +-
.../DefaultSerializationDelegate.java | 11 +-
.../GzipBridgeSerializationDelegate.java | 7 +-
.../GzipSerializationDelegate.java | 10 +-
.../serialization/SerializationDelegate.java | 2 +-
.../ThriftSerializationDelegate.java | 52 +
.../backtype/storm/topology/BoltDeclarer.java | 4 +
.../backtype/storm/topology/InputDeclarer.java | 128 ++
.../storm/topology/TopologyBuilder.java | 6 +-
.../jvm/backtype/storm/utils/DRPCClient.java | 2 +-
.../jvm/backtype/storm/utils/LocalState.java | 4 +-
.../jvm/backtype/storm/utils/NimbusClient.java | 19 +-
.../src/jvm/backtype/storm/utils/Utils.java | 82 +-
.../backtype/storm/utils/VersionedStore.java | 9 +-
storm-core/src/jvm/storm/trident/Stream.java | 2 +-
.../src/jvm/storm/trident/TridentTopology.java | 6 +-
storm-core/src/py/storm/Nimbus-remote | 0
storm-core/src/py/storm/ttypes.py | 827 +++++++++++-
storm-core/src/py_license_header.txt | 18 +
storm-core/src/storm.thrift | 51 +
.../templates/component-page-template.html | 2 +-
.../templates/topology-page-template.html | 4 +-
.../test/clj/backtype/storm/cluster_test.clj | 25 +-
.../test/clj/backtype/storm/nimbus_test.clj | 6 +-
.../auth/DefaultHttpCredentialsPlugin_test.clj | 16 +-
.../backtype/storm/security/auth/auth_test.clj | 146 ++-
.../GzipBridgeSerializationDelegateTest.java | 6 +-
.../ThriftBridgeSerializationDelegateTest.java | 60 +
110 files changed, 8473 insertions(+), 1086 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/26b2f324/bin/storm.py
----------------------------------------------------------------------
diff --cc bin/storm.py
index 0000000,2cf6c32..bc5c13d
mode 000000,100755..100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@@ -1,0 -1,543 +1,548 @@@
+ #!/usr/bin/python
+
+ # Licensed to the Apache Software Foundation (ASF) under one
+ # or more contributor license agreements. See the NOTICE file
+ # distributed with this work for additional information
+ # regarding copyright ownership. The ASF licenses this file
+ # to you under the Apache License, Version 2.0 (the
+ # "License"); you may not use this file except in compliance
+ # with the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+ import os
+ import sys
+ import random
+ import subprocess as sub
+ import re
+ import shlex
+ try:
+ # python 3
+ from urllib.parse import quote_plus
+ except ImportError:
+ # python 2
+ from urllib import quote_plus
+ try:
+ # python 3
+ import configparser
+ except ImportError:
+ # python 2
+ import ConfigParser as configparser
+
+ def is_windows():
+ return sys.platform.startswith('win')
+
+ def identity(x):
+ return x
+
+ def cygpath(x):
+ command = ["cygpath", "-wp", x]
+ p = sub.Popen(command,stdout=sub.PIPE)
+ output, errors = p.communicate()
+ lines = output.split(os.linesep)
+ return lines[0]
+
+ def init_storm_env():
+ global CLUSTER_CONF_DIR
+ ini_file = os.path.join(CLUSTER_CONF_DIR, 'storm_env.ini')
+ if not os.path.isfile(ini_file):
+ return
+ config = configparser.ConfigParser()
+ config.optionxform = str
+ config.read(ini_file)
+ options = config.options('environment')
+ for option in options:
+ value = config.get('environment', option)
+ os.environ[option] = value
+
+ normclasspath = cygpath if sys.platform == 'cygwin' else identity
+ STORM_DIR = os.sep.join(os.path.realpath( __file__ ).split(os.sep)[:-2])
+ USER_CONF_DIR = os.path.expanduser("~" + os.sep + ".storm")
+ STORM_CONF_DIR = os.getenv('STORM_CONF_DIR', None)
+
+ if STORM_CONF_DIR == None:
+ CLUSTER_CONF_DIR = os.path.join(STORM_DIR, "conf")
+ else:
+ CLUSTER_CONF_DIR = STORM_CONF_DIR
+
+ if (not os.path.isfile(os.path.join(USER_CONF_DIR, "storm.yaml"))):
+ USER_CONF_DIR = CLUSTER_CONF_DIR
+
+ STORM_LIB_DIR = os.path.join(STORM_DIR, "lib")
+ STORM_BIN_DIR = os.path.join(STORM_DIR, "bin")
+ STORM_LOGBACK_CONF_DIR = os.path.join(STORM_DIR, "logback")
+
+ init_storm_env()
+
+ CONFIG_OPTS = []
+ CONFFILE = ""
+ JAR_JVM_OPTS = shlex.split(os.getenv('STORM_JAR_JVM_OPTS', ''))
+ JAVA_HOME = os.getenv('JAVA_HOME', None)
+ JAVA_CMD = 'java' if not JAVA_HOME else os.path.join(JAVA_HOME, 'bin', 'java')
+
+ def get_config_opts():
+ global CONFIG_OPTS
+ return "-Dstorm.options=" + ','.join(map(quote_plus,CONFIG_OPTS))
+
+ if not os.path.exists(STORM_LIB_DIR):
+ print("******************************************")
+ print("The storm client can only be run from within a release. You appear to be trying to run the client from a checkout of Storm's source code.")
+ print("\nYou can download a Storm release at http://storm-project.net/downloads.html")
+ print("******************************************")
+ sys.exit(1)
+
+ def get_jars_full(adir):
+ files = os.listdir(adir)
+ ret = []
+ for f in files:
+ if f.endswith(".jar"):
+ ret.append(os.path.join(adir, f))
+ return ret
+
+ def get_classpath(extrajars):
+ ret = get_jars_full(STORM_DIR)
+ ret.extend(get_jars_full(STORM_LIB_DIR))
+ ret.extend(extrajars)
+ return normclasspath(os.pathsep.join(ret))
+
+ def confvalue(name, extrapaths):
+ global CONFFILE
+ command = [
+ JAVA_CMD, "-client", get_config_opts(), "-Dstorm.conf.file=" + CONFFILE,
+ "-cp", get_classpath(extrapaths), "backtype.storm.command.config_value", name
+ ]
+ p = sub.Popen(command, stdout=sub.PIPE)
+ output, errors = p.communicate()
+ # python 3
+ if not isinstance(output, str):
+ output = output.decode('utf-8')
+ lines = output.split(os.linesep)
+ for line in lines:
+ tokens = line.split(" ")
+ if tokens[0] == "VALUE:":
+ return " ".join(tokens[1:])
+ return ""
+
+ def print_localconfvalue(name):
+ """Syntax: [storm localconfvalue conf-name]
+
+ Prints out the value for conf-name in the local Storm configs.
+ The local Storm configs are the ones in ~/.storm/storm.yaml merged
+ in with the configs in defaults.yaml.
+ """
+ print(name + ": " + confvalue(name, [USER_CONF_DIR]))
+
+ def print_remoteconfvalue(name):
+ """Syntax: [storm remoteconfvalue conf-name]
+
+ Prints out the value for conf-name in the cluster's Storm configs.
+ The cluster's Storm configs are the ones in $STORM-PATH/conf/storm.yaml
+ merged in with the configs in defaults.yaml.
+
+ This command must be run on a cluster machine.
+ """
+ print(name + ": " + confvalue(name, [CLUSTER_CONF_DIR]))
+
+ def parse_args(string):
+ r"""Takes a string of whitespace-separated tokens and parses it into a list.
+ Whitespace inside tokens may be quoted with single quotes, double quotes or
+ backslash (similar to command-line arguments in bash).
+
+ >>> parse_args(r'''"a a" 'b b' c\ c "d'd" 'e"e' 'f\'f' "g\"g" "i""i" 'j''j' k" "k l' l' mm n\\n''')
+ ['a a', 'b b', 'c c', "d'd", 'e"e', "f'f", 'g"g', 'ii', 'jj', 'k k', 'l l', 'mm', r'n\n']
+ """
+ re_split = re.compile(r'''((?:
+ [^\s"'\\] |
+ "(?: [^"\\] | \\.)*" |
+ '(?: [^'\\] | \\.)*' |
+ \\.
+ )+)''', re.VERBOSE)
+ args = re_split.split(string)[1::2]
+ args = [re.compile(r'"((?:[^"\\]|\\.)*)"').sub('\\1', x) for x in args]
+ args = [re.compile(r"'((?:[^'\\]|\\.)*)'").sub('\\1', x) for x in args]
+ return [re.compile(r'\\(.)').sub('\\1', x) for x in args]
+
+ def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False):
+ global CONFFILE
+ storm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR])
+ if(storm_log_dir == None or storm_log_dir == "nil"):
+ storm_log_dir = os.path.join(STORM_DIR, "logs")
+ all_args = [
+ "java", jvmtype, get_config_opts(),
+ "-Dstorm.home=" + STORM_DIR,
+ "-Dstorm.log.dir=" + storm_log_dir,
+ "-Djava.library.path=" + confvalue("java.library.path", extrajars),
+ "-Dstorm.conf.file=" + CONFFILE,
+ "-cp", get_classpath(extrajars),
+ ] + jvmopts + [klass] + list(args)
+ print("Running: " + " ".join(all_args))
+ if fork:
+ os.spawnvp(os.P_WAIT, JAVA_CMD, all_args)
+ elif is_windows():
+ # handling whitespaces in JAVA_CMD
- sub.call(all_args)
++ try:
++ ret = sub.check_output(all_args, stderr=sub.STDOUT)
++ print(ret)
++ except sub.CalledProcessor as e:
++ sys.exit(e.returncode)
+ else:
+ os.execvp(JAVA_CMD, all_args)
++ os._exit()
+
+ def jar(jarfile, klass, *args):
+ """Syntax: [storm jar topology-jar-path class ...]
+
+ Runs the main method of class with the specified arguments.
+ The storm jars and configs in ~/.storm are put on the classpath.
+ The process is configured so that StormSubmitter
+ (http://storm.incubator.apache.org/apidocs/backtype/storm/StormSubmitter.html)
+ will upload the jar at topology-jar-path when the topology is submitted.
+ """
+ exec_storm_class(
+ klass,
+ jvmtype="-client",
+ extrajars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR],
+ args=args,
+ jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile])
+
+ def kill(*args):
+ """Syntax: [storm kill topology-name [-w wait-time-secs]]
+
+ Kills the topology with the name topology-name. Storm will
+ first deactivate the topology's spouts for the duration of
+ the topology's message timeout to allow all messages currently
+ being processed to finish processing. Storm will then shutdown
+ the workers and clean up their state. You can override the length
+ of time Storm waits between deactivation and shutdown with the -w flag.
+ """
+ exec_storm_class(
+ "backtype.storm.command.kill_topology",
+ args=args,
+ jvmtype="-client",
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+
+
+ def upload_credentials(*args):
+ """Syntax: [storm upload_credentials topology-name [credkey credvalue]*]
+
+ Uploads a new set of credentials to a running topology
+ """
+ exec_storm_class(
+ "backtype.storm.command.upload_credentials",
+ args=args,
+ jvmtype="-client",
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+
+ def activate(*args):
+ """Syntax: [storm activate topology-name]
+
+ Activates the specified topology's spouts.
+ """
+ exec_storm_class(
+ "backtype.storm.command.activate",
+ args=args,
+ jvmtype="-client",
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+
+ def listtopos(*args):
+ """Syntax: [storm list]
+
+ List the running topologies and their statuses.
+ """
+ exec_storm_class(
+ "backtype.storm.command.list",
+ args=args,
+ jvmtype="-client",
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+
+ def deactivate(*args):
+ """Syntax: [storm deactivate topology-name]
+
+ Deactivates the specified topology's spouts.
+ """
+ exec_storm_class(
+ "backtype.storm.command.deactivate",
+ args=args,
+ jvmtype="-client",
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+
+ def rebalance(*args):
+ """Syntax: [storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*]
+
+ Sometimes you may wish to spread out where the workers for a topology
+ are running. For example, let's say you have a 10 node cluster running
+ 4 workers per node, and then let's say you add another 10 nodes to
+ the cluster. You may wish to have Storm spread out the workers for the
+ running topology so that each node runs 2 workers. One way to do this
+ is to kill the topology and resubmit it, but Storm provides a "rebalance"
+ command that provides an easier way to do this.
+
+ Rebalance will first deactivate the topology for the duration of the
+ message timeout (overridable with the -w flag) and then redistribute
+ the workers evenly around the cluster. The topology will then return to
+ its previous state of activation (so a deactivated topology will still
+ be deactivated and an activated topology will go back to being activated).
+
+ The rebalance command can also be used to change the parallelism of a running topology.
+ Use the -n and -e switches to change the number of workers or number of executors of a component
+ respectively.
+ """
+ exec_storm_class(
+ "backtype.storm.command.rebalance",
+ args=args,
+ jvmtype="-client",
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+
+ def shell(resourcesdir, command, *args):
+ tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar"
+ os.system("jar cf %s %s" % (tmpjarpath, resourcesdir))
+ runnerargs = [tmpjarpath, command]
+ runnerargs.extend(args)
+ exec_storm_class(
+ "backtype.storm.command.shell_submission",
+ args=runnerargs,
+ jvmtype="-client",
+ extrajars=[USER_CONF_DIR],
+ fork=True)
+ os.system("rm " + tmpjarpath)
+
+ def repl():
+ """Syntax: [storm repl]
+
+ Opens up a Clojure REPL with the storm jars and configuration
+ on the classpath. Useful for debugging.
+ """
+ cppaths = [CLUSTER_CONF_DIR]
+ exec_storm_class("clojure.main", jvmtype="-client", extrajars=cppaths)
+
+ def get_logback_conf_dir():
+ cppaths = [CLUSTER_CONF_DIR]
+ storm_logback_conf_dir = confvalue("storm.logback.conf.dir", cppaths)
+ if(storm_logback_conf_dir == None or storm_logback_conf_dir == "nil"):
+ storm_logback_conf_dir = STORM_LOGBACK_CONF_DIR
+ return storm_logback_conf_dir
+
+ def nimbus(klass="backtype.storm.daemon.nimbus"):
+ """Syntax: [storm nimbus]
+
+ Launches the nimbus daemon. This command should be run under
+ supervision with a tool like daemontools or monit.
+
+ See Setting up a Storm cluster for more information.
+ (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
+ """
+ cppaths = [CLUSTER_CONF_DIR]
+ jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
+ "-Dlogfile.name=nimbus.log",
+ "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml"),
+ ]
+ exec_storm_class(
+ klass,
+ jvmtype="-server",
+ extrajars=cppaths,
+ jvmopts=jvmopts)
+
+ def supervisor(klass="backtype.storm.daemon.supervisor"):
+ """Syntax: [storm supervisor]
+
+ Launches the supervisor daemon. This command should be run
+ under supervision with a tool like daemontools or monit.
+
+ See Setting up a Storm cluster for more information.
+ (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
+ """
+ cppaths = [CLUSTER_CONF_DIR]
+ jvmopts = parse_args(confvalue("supervisor.childopts", cppaths)) + [
+ "-Dlogfile.name=supervisor.log",
+ "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml"),
+ ]
+ exec_storm_class(
+ klass,
+ jvmtype="-server",
+ extrajars=cppaths,
+ jvmopts=jvmopts)
+
+ def ui():
+ """Syntax: [storm ui]
+
+ Launches the UI daemon. The UI provides a web interface for a Storm
+ cluster and shows detailed stats about running topologies. This command
+ should be run under supervision with a tool like daemontools or monit.
+
+ See Setting up a Storm cluster for more information.
+ (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
+ """
+ cppaths = [CLUSTER_CONF_DIR]
+ jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [
+ "-Dlogfile.name=ui.log",
+ "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml")
+ ]
+ exec_storm_class(
+ "backtype.storm.ui.core",
+ jvmtype="-server",
+ jvmopts=jvmopts,
+ extrajars=[STORM_DIR, CLUSTER_CONF_DIR])
+
+ def logviewer():
+ """Syntax: [storm logviewer]
+
+ Launches the log viewer daemon. It provides a web interface for viewing
+ storm log files. This command should be run under supervision with a
+ tool like daemontools or monit.
+
+ See Setting up a Storm cluster for more information.
+ (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
+ """
+ cppaths = [CLUSTER_CONF_DIR]
+ jvmopts = parse_args(confvalue("logviewer.childopts", cppaths)) + [
+ "-Dlogfile.name=logviewer.log",
+ "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml")
+ ]
+ exec_storm_class(
+ "backtype.storm.daemon.logviewer",
+ jvmtype="-server",
+ jvmopts=jvmopts,
+ extrajars=[STORM_DIR, CLUSTER_CONF_DIR])
+
+ def drpc():
+ """Syntax: [storm drpc]
+
+ Launches a DRPC daemon. This command should be run under supervision
+ with a tool like daemontools or monit.
+
+ See Distributed RPC for more information.
+ (http://storm.incubator.apache.org/documentation/Distributed-RPC)
+ """
+ cppaths = [CLUSTER_CONF_DIR]
+ jvmopts = parse_args(confvalue("drpc.childopts", cppaths)) + [
+ "-Dlogfile.name=drpc.log",
+ "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml")
+ ]
+ exec_storm_class(
+ "backtype.storm.daemon.drpc",
+ jvmtype="-server",
+ jvmopts=jvmopts,
+ extrajars=[CLUSTER_CONF_DIR])
+
+ def dev_zookeeper():
+ """Syntax: [storm dev-zookeeper]
+
+ Launches a fresh Zookeeper server using "dev.zookeeper.path" as its local dir and
+ "storm.zookeeper.port" as its port. This is only intended for development/testing, the
+ Zookeeper instance launched is not configured to be used in production.
+ """
+ cppaths = [CLUSTER_CONF_DIR]
+ exec_storm_class(
+ "backtype.storm.command.dev_zookeeper",
+ jvmtype="-server",
+ extrajars=[CLUSTER_CONF_DIR])
+
+ def version():
+ """Syntax: [storm version]
+
+ Prints the version number of this Storm release.
+ """
+ cppaths = [CLUSTER_CONF_DIR]
+ exec_storm_class(
+ "backtype.storm.utils.VersionInfo",
+ jvmtype="-client",
+ extrajars=[CLUSTER_CONF_DIR])
+
+ def print_classpath():
+ """Syntax: [storm classpath]
+
+ Prints the classpath used by the storm client when running commands.
+ """
+ print(get_classpath([]))
+
+ def monitor(*args):
+ """Syntax: [storm monitor topology-name [-i interval-secs] [-m component-id] [-s stream-id] [-w [emitted | transferred]]]
+
+ Monitor given topology's throughput interactively.
+ One can specify poll-interval, component-id, stream-id, watch-item[emitted | transferred]
+ By default,
+ poll-interval is 4 seconds;
+ all component-ids will be list;
+ stream-id is 'default';
+ watch-item is 'emitted';
+ """
+ exec_storm_class(
+ "backtype.storm.command.monitor",
+ args=args,
+ jvmtype="-client",
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+
+
+ def print_commands():
+ """Print all client commands and link to documentation"""
+ print("Commands:\n\t" + "\n\t".join(sorted(COMMANDS.keys())))
+ print("\nHelp: \n\thelp \n\thelp <command>")
+ print("\nDocumentation for the storm client can be found at http://storm.incubator.apache.org/documentation/Command-line-client.html\n")
+ print("Configs can be overridden using one or more -c flags, e.g. \"storm list -c nimbus.host=nimbus.mycompany.com\"\n")
+
+ def print_usage(command=None):
+ """Print one help message or list of available commands"""
+ if command != None:
+ if command in COMMANDS:
+ print(COMMANDS[command].__doc__ or
+ "No documentation provided for <%s>" % command)
+ else:
+ print("<%s> is not a valid command" % command)
+ else:
+ print_commands()
+
+ def unknown_command(*args):
+ print("Unknown command: [storm %s]" % ' '.join(sys.argv[1:]))
+ print_usage()
+ sys.exit(254)
+
+ COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, "logviewer": logviewer,
+ "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
+ "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath,
+ "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage,
+ "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor,
+ "upload-credentials": upload_credentials}
+
+ def parse_config(config_list):
+ global CONFIG_OPTS
+ if len(config_list) > 0:
+ for config in config_list:
+ CONFIG_OPTS.append(config)
+
+ def parse_config_opts(args):
+ curr = args[:]
+ curr.reverse()
+ config_list = []
+ args_list = []
+
+ while len(curr) > 0:
+ token = curr.pop()
+ if token == "-c":
+ config_list.append(curr.pop())
+ elif token == "--config":
+ global CONFFILE
+ CONFFILE = curr.pop()
+ else:
+ args_list.append(token)
+
+ return config_list, args_list
+
+ def main():
+ if len(sys.argv) <= 1:
+ print_usage()
+ sys.exit(-1)
+ global CONFIG_OPTS
+ config_list, args = parse_config_opts(sys.argv[1:])
+ parse_config(config_list)
+ COMMAND = args[0]
+ ARGS = args[1:]
+ (COMMANDS.get(COMMAND, unknown_command))(*ARGS)
+
+ if __name__ == "__main__":
+ main()