You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/05 22:20:49 UTC

[3/5] storm git commit: Remove handling stdout

Remove handling stdout


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/26b2f324
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/26b2f324
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/26b2f324

Branch: refs/heads/master
Commit: 26b2f324cbd68aea0839423b4316070d8200f7ae
Parents: 506c242 bb8d48d
Author: lewuathe <le...@me.com>
Authored: Thu Mar 19 23:13:29 2015 +0900
Committer: lewuathe <le...@me.com>
Committed: Thu Mar 19 23:13:29 2015 +0900

----------------------------------------------------------------------
 CHANGELOG.md                                    |   21 +
 DEVELOPER.md                                    |    7 +
 README.markdown                                 |    2 +
 SECURITY.md                                     |   59 +-
 STORM-UI-REST-API.md                            |   12 +
 bin/storm                                       |  594 +--------
 bin/storm.py                                    |  548 ++++++++
 conf/defaults.yaml                              |    4 +-
 conf/storm-env.sh                               |   24 +
 dev-tools/test-ns.py                            |   17 +
 docs/documentation/Clojure-DSL.md               |    4 +-
 docs/documentation/Command-line-client.md       |    2 +-
 docs/documentation/Common-patterns.md           |    6 +-
 docs/documentation/Concepts.md                  |   48 +-
 docs/documentation/Configuration.md             |    4 +-
 docs/documentation/Distributed-RPC.md           |    2 +-
 .../Guaranteeing-message-processing.md          |    6 +-
 docs/documentation/Hooks.md                     |    6 +-
 docs/documentation/Local-mode.md                |    4 +-
 docs/documentation/Powered-By.md                |   23 +-
 ...unning-topologies-on-a-production-cluster.md |    6 +-
 .../Serialization-(prior-to-0.6.0).md           |    4 +-
 docs/documentation/Serialization.md             |    2 +-
 docs/documentation/Structure-of-the-codebase.md |    8 +-
 docs/documentation/Transactional-topologies.md  |   18 +-
 docs/documentation/Tutorial.md                  |    8 +-
 ...nding-the-parallelism-of-a-Storm-topology.md |   16 +-
 external/README.md                              |   18 +
 external/storm-jdbc/README.md                   |   84 +-
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  |   36 +-
 .../org/apache/storm/jdbc/common/Column.java    |    3 +-
 .../apache/storm/jdbc/common/JdbcClient.java    |   52 +-
 .../jdbc/mapper/SimpleJdbcLookupMapper.java     |    2 +-
 .../storm/jdbc/trident/state/JdbcState.java     |   18 +-
 .../storm/jdbc/common/JdbcClientTest.java       |   39 +-
 .../jdbc/topology/UserPersistanceTopology.java  |    2 +-
 .../ExponentialBackoffMsgRetryManager.java      |    2 +-
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |    7 +-
 .../test/storm/kafka/bolt/KafkaBoltTest.java    |    2 +-
 .../storm/redis/trident/state/RedisState.java   |    2 +-
 storm-core/pom.xml                              |    3 +-
 storm-core/src/clj/backtype/storm/cluster.clj   |   58 +-
 storm-core/src/clj/backtype/storm/config.clj    |    7 +-
 storm-core/src/clj/backtype/storm/converter.clj |  201 +++
 .../src/clj/backtype/storm/daemon/common.clj    |   10 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |    4 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |    5 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  120 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |   12 +-
 .../src/clj/backtype/storm/daemon/task.clj      |   11 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |    3 +-
 storm-core/src/clj/backtype/storm/stats.clj     |   78 +-
 storm-core/src/clj/backtype/storm/thrift.clj    |   34 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |   50 +-
 storm-core/src/genthrift.sh                     |   13 +-
 storm-core/src/java_license_header.txt          |   17 +
 storm-core/src/jvm/backtype/storm/Config.java   |   40 +-
 .../jvm/backtype/storm/ConfigValidation.java    |    8 +-
 .../src/jvm/backtype/storm/StormSubmitter.java  |   88 +-
 .../storm/coordination/BatchBoltExecutor.java   |    4 +-
 .../backtype/storm/generated/Assignment.java    |  983 ++++++++++++++
 .../storm/generated/ClusterWorkerHeartbeat.java |  673 ++++++++++
 .../backtype/storm/generated/ExecutorStats.java |  105 +-
 .../jvm/backtype/storm/generated/NodeInfo.java  |  556 ++++++++
 .../jvm/backtype/storm/generated/StormBase.java | 1211 ++++++++++++++++++
 .../storm/generated/SupervisorInfo.java         | 1182 +++++++++++++++++
 .../storm/generated/TopologyActionOptions.java  |  387 ++++++
 .../storm/generated/TopologyStatus.java         |   68 +
 .../backtype/storm/messaging/netty/Client.java  |   10 +-
 .../backtype/storm/messaging/netty/Context.java |   33 +-
 .../auth/DefaultHttpCredentialsPlugin.java      |   19 +-
 .../storm/security/auth/ITransportPlugin.java   |    4 +-
 .../storm/security/auth/ReqContext.java         |   28 +-
 .../security/auth/SaslTransportPlugin.java      |    3 +-
 .../security/auth/SimpleTransportPlugin.java    |    5 +-
 .../storm/security/auth/TBackoffConnect.java    |    4 +-
 .../storm/security/auth/ThriftClient.java       |   12 +-
 .../authorizer/ImpersonationAuthorizer.java     |  154 +++
 .../auth/authorizer/SimpleACLAuthorizer.java    |   55 +-
 .../auth/digest/DigestSaslTransportPlugin.java  |    6 +-
 .../auth/digest/ServerCallbackHandler.java      |   21 +-
 .../kerberos/KerberosSaslTransportPlugin.java   |    9 +-
 .../auth/kerberos/ServerCallbackHandler.java    |   38 +-
 .../DefaultSerializationDelegate.java           |   11 +-
 .../GzipBridgeSerializationDelegate.java        |    7 +-
 .../GzipSerializationDelegate.java              |   10 +-
 .../serialization/SerializationDelegate.java    |    2 +-
 .../ThriftSerializationDelegate.java            |   52 +
 .../backtype/storm/topology/BoltDeclarer.java   |    4 +
 .../backtype/storm/topology/InputDeclarer.java  |  128 ++
 .../storm/topology/TopologyBuilder.java         |    6 +-
 .../jvm/backtype/storm/utils/DRPCClient.java    |    2 +-
 .../jvm/backtype/storm/utils/LocalState.java    |    4 +-
 .../jvm/backtype/storm/utils/NimbusClient.java  |   19 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |   82 +-
 .../backtype/storm/utils/VersionedStore.java    |    9 +-
 storm-core/src/jvm/storm/trident/Stream.java    |    2 +-
 .../src/jvm/storm/trident/TridentTopology.java  |    6 +-
 storm-core/src/py/storm/Nimbus-remote           |    0
 storm-core/src/py/storm/ttypes.py               |  827 +++++++++++-
 storm-core/src/py_license_header.txt            |   18 +
 storm-core/src/storm.thrift                     |   51 +
 .../templates/component-page-template.html      |    2 +-
 .../templates/topology-page-template.html       |    4 +-
 .../test/clj/backtype/storm/cluster_test.clj    |   25 +-
 .../test/clj/backtype/storm/nimbus_test.clj     |    6 +-
 .../auth/DefaultHttpCredentialsPlugin_test.clj  |   16 +-
 .../backtype/storm/security/auth/auth_test.clj  |  146 ++-
 .../GzipBridgeSerializationDelegateTest.java    |    6 +-
 .../ThriftBridgeSerializationDelegateTest.java  |   60 +
 110 files changed, 8473 insertions(+), 1086 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/26b2f324/bin/storm.py
----------------------------------------------------------------------
diff --cc bin/storm.py
index 0000000,2cf6c32..bc5c13d
mode 000000,100755..100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@@ -1,0 -1,543 +1,548 @@@
+ #!/usr/bin/python
+ 
+ # Licensed to the Apache Software Foundation (ASF) under one
+ # or more contributor license agreements.  See the NOTICE file
+ # distributed with this work for additional information
+ # regarding copyright ownership.  The ASF licenses this file
+ # to you under the Apache License, Version 2.0 (the
+ # "License"); you may not use this file except in compliance
+ # with the License.  You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+ 
+ import os
+ import sys
+ import random
+ import subprocess as sub
+ import re
+ import shlex
+ try:
+     # python 3
+     from urllib.parse import quote_plus
+ except ImportError:
+     # python 2
+     from urllib import quote_plus
+ try:
+     # python 3
+     import configparser
+ except ImportError:
+     # python 2
+     import ConfigParser as configparser
+ 
+ def is_windows():
+     return sys.platform.startswith('win')
+ 
+ def identity(x):
+     return x
+ 
+ def cygpath(x):
+     command = ["cygpath", "-wp", x]
+     p = sub.Popen(command,stdout=sub.PIPE)
+     output, errors = p.communicate()
+     lines = output.split(os.linesep)
+     return lines[0]
+ 
+ def init_storm_env():
+     global CLUSTER_CONF_DIR
+     ini_file = os.path.join(CLUSTER_CONF_DIR, 'storm_env.ini')
+     if not os.path.isfile(ini_file):
+         return
+     config = configparser.ConfigParser()
+     config.optionxform = str
+     config.read(ini_file)
+     options = config.options('environment')
+     for option in options:
+         value = config.get('environment', option)
+         os.environ[option] = value
+ 
+ normclasspath = cygpath if sys.platform == 'cygwin' else identity
+ STORM_DIR = os.sep.join(os.path.realpath( __file__ ).split(os.sep)[:-2])
+ USER_CONF_DIR = os.path.expanduser("~" + os.sep + ".storm")
+ STORM_CONF_DIR = os.getenv('STORM_CONF_DIR', None)
+ 
+ if STORM_CONF_DIR == None:
+     CLUSTER_CONF_DIR = os.path.join(STORM_DIR, "conf")
+ else:
+     CLUSTER_CONF_DIR = STORM_CONF_DIR
+ 
+ if (not os.path.isfile(os.path.join(USER_CONF_DIR, "storm.yaml"))):
+     USER_CONF_DIR = CLUSTER_CONF_DIR
+ 
+ STORM_LIB_DIR = os.path.join(STORM_DIR, "lib")
+ STORM_BIN_DIR = os.path.join(STORM_DIR, "bin")
+ STORM_LOGBACK_CONF_DIR = os.path.join(STORM_DIR, "logback")
+ 
+ init_storm_env()
+ 
+ CONFIG_OPTS = []
+ CONFFILE = ""
+ JAR_JVM_OPTS = shlex.split(os.getenv('STORM_JAR_JVM_OPTS', ''))
+ JAVA_HOME = os.getenv('JAVA_HOME', None)
+ JAVA_CMD = 'java' if not JAVA_HOME else os.path.join(JAVA_HOME, 'bin', 'java')
+ 
+ def get_config_opts():
+     global CONFIG_OPTS
+     return "-Dstorm.options=" + ','.join(map(quote_plus,CONFIG_OPTS))
+ 
+ if not os.path.exists(STORM_LIB_DIR):
+     print("******************************************")
+     print("The storm client can only be run from within a release. You appear to be trying to run the client from a checkout of Storm's source code.")
+     print("\nYou can download a Storm release at http://storm-project.net/downloads.html")
+     print("******************************************")
+     sys.exit(1)
+ 
+ def get_jars_full(adir):
+     files = os.listdir(adir)
+     ret = []
+     for f in files:
+         if f.endswith(".jar"):
+             ret.append(os.path.join(adir, f))
+     return ret
+ 
+ def get_classpath(extrajars):
+     ret = get_jars_full(STORM_DIR)
+     ret.extend(get_jars_full(STORM_LIB_DIR))
+     ret.extend(extrajars)
+     return normclasspath(os.pathsep.join(ret))
+ 
+ def confvalue(name, extrapaths):
+     global CONFFILE
+     command = [
+         JAVA_CMD, "-client", get_config_opts(), "-Dstorm.conf.file=" + CONFFILE,
+         "-cp", get_classpath(extrapaths), "backtype.storm.command.config_value", name
+     ]
+     p = sub.Popen(command, stdout=sub.PIPE)
+     output, errors = p.communicate()
+     # python 3
+     if not isinstance(output, str):
+         output = output.decode('utf-8')
+     lines = output.split(os.linesep)
+     for line in lines:
+         tokens = line.split(" ")
+         if tokens[0] == "VALUE:":
+             return " ".join(tokens[1:])
+     return ""
+ 
+ def print_localconfvalue(name):
+     """Syntax: [storm localconfvalue conf-name]
+ 
+     Prints out the value for conf-name in the local Storm configs.
+     The local Storm configs are the ones in ~/.storm/storm.yaml merged
+     in with the configs in defaults.yaml.
+     """
+     print(name + ": " + confvalue(name, [USER_CONF_DIR]))
+ 
+ def print_remoteconfvalue(name):
+     """Syntax: [storm remoteconfvalue conf-name]
+ 
+     Prints out the value for conf-name in the cluster's Storm configs.
+     The cluster's Storm configs are the ones in $STORM-PATH/conf/storm.yaml
+     merged in with the configs in defaults.yaml.
+ 
+     This command must be run on a cluster machine.
+     """
+     print(name + ": " + confvalue(name, [CLUSTER_CONF_DIR]))
+ 
+ def parse_args(string):
+     r"""Takes a string of whitespace-separated tokens and parses it into a list.
+     Whitespace inside tokens may be quoted with single quotes, double quotes or
+     backslash (similar to command-line arguments in bash).
+ 
+     >>> parse_args(r'''"a a" 'b b' c\ c "d'd" 'e"e' 'f\'f' "g\"g" "i""i" 'j''j' k" "k l' l' mm n\\n''')
+     ['a a', 'b b', 'c c', "d'd", 'e"e', "f'f", 'g"g', 'ii', 'jj', 'k k', 'l l', 'mm', r'n\n']
+     """
+     re_split = re.compile(r'''((?:
+         [^\s"'\\] |
+         "(?: [^"\\] | \\.)*" |
+         '(?: [^'\\] | \\.)*' |
+         \\.
+     )+)''', re.VERBOSE)
+     args = re_split.split(string)[1::2]
+     args = [re.compile(r'"((?:[^"\\]|\\.)*)"').sub('\\1', x) for x in args]
+     args = [re.compile(r"'((?:[^'\\]|\\.)*)'").sub('\\1', x) for x in args]
+     return [re.compile(r'\\(.)').sub('\\1', x) for x in args]
+ 
+ def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False):
+     global CONFFILE
+     storm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR])
+     if(storm_log_dir == None or storm_log_dir == "nil"):
+         storm_log_dir = os.path.join(STORM_DIR, "logs")
+     all_args = [
+         "java", jvmtype, get_config_opts(),
+         "-Dstorm.home=" + STORM_DIR,
+         "-Dstorm.log.dir=" + storm_log_dir,
+         "-Djava.library.path=" + confvalue("java.library.path", extrajars),
+         "-Dstorm.conf.file=" + CONFFILE,
+         "-cp", get_classpath(extrajars),
+     ] + jvmopts + [klass] + list(args)
+     print("Running: " + " ".join(all_args))
+     if fork:
+         os.spawnvp(os.P_WAIT, JAVA_CMD, all_args)
+     elif is_windows():
+         # handling whitespaces in JAVA_CMD
 -        sub.call(all_args)
++        try:
++            ret = sub.check_output(all_args, stderr=sub.STDOUT)
++            print(ret)
++        except sub.CalledProcessor as e:
++            sys.exit(e.returncode)
+     else:
+         os.execvp(JAVA_CMD, all_args)
++        os._exit()
+ 
+ def jar(jarfile, klass, *args):
+     """Syntax: [storm jar topology-jar-path class ...]
+ 
+     Runs the main method of class with the specified arguments.
+     The storm jars and configs in ~/.storm are put on the classpath.
+     The process is configured so that StormSubmitter
+     (http://storm.incubator.apache.org/apidocs/backtype/storm/StormSubmitter.html)
+     will upload the jar at topology-jar-path when the topology is submitted.
+     """
+     exec_storm_class(
+         klass,
+         jvmtype="-client",
+         extrajars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR],
+         args=args,
+         jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile])
+ 
+ def kill(*args):
+     """Syntax: [storm kill topology-name [-w wait-time-secs]]
+ 
+     Kills the topology with the name topology-name. Storm will
+     first deactivate the topology's spouts for the duration of
+     the topology's message timeout to allow all messages currently
+     being processed to finish processing. Storm will then shutdown
+     the workers and clean up their state. You can override the length
+     of time Storm waits between deactivation and shutdown with the -w flag.
+     """
+     exec_storm_class(
+         "backtype.storm.command.kill_topology",
+         args=args,
+         jvmtype="-client",
+         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ 
+ 
+ def upload_credentials(*args):
+     """Syntax: [storm upload_credentials topology-name [credkey credvalue]*]
+ 
+     Uploads a new set of credentials to a running topology
+     """
+     exec_storm_class(
+         "backtype.storm.command.upload_credentials",
+         args=args,
+         jvmtype="-client",
+         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ 
+ def activate(*args):
+     """Syntax: [storm activate topology-name]
+ 
+     Activates the specified topology's spouts.
+     """
+     exec_storm_class(
+         "backtype.storm.command.activate",
+         args=args,
+         jvmtype="-client",
+         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ 
+ def listtopos(*args):
+     """Syntax: [storm list]
+ 
+     List the running topologies and their statuses.
+     """
+     exec_storm_class(
+         "backtype.storm.command.list",
+         args=args,
+         jvmtype="-client",
+         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ 
+ def deactivate(*args):
+     """Syntax: [storm deactivate topology-name]
+ 
+     Deactivates the specified topology's spouts.
+     """
+     exec_storm_class(
+         "backtype.storm.command.deactivate",
+         args=args,
+         jvmtype="-client",
+         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ 
+ def rebalance(*args):
+     """Syntax: [storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*]
+ 
+     Sometimes you may wish to spread out where the workers for a topology
+     are running. For example, let's say you have a 10 node cluster running
+     4 workers per node, and then let's say you add another 10 nodes to
+     the cluster. You may wish to have Storm spread out the workers for the
+     running topology so that each node runs 2 workers. One way to do this
+     is to kill the topology and resubmit it, but Storm provides a "rebalance"
+     command that provides an easier way to do this.
+ 
+     Rebalance will first deactivate the topology for the duration of the
+     message timeout (overridable with the -w flag) and then redistribute
+     the workers evenly around the cluster. The topology will then return to
+     its previous state of activation (so a deactivated topology will still
+     be deactivated and an activated topology will go back to being activated).
+ 
+     The rebalance command can also be used to change the parallelism of a running topology.
+     Use the -n and -e switches to change the number of workers or number of executors of a component
+     respectively.
+     """
+     exec_storm_class(
+         "backtype.storm.command.rebalance",
+         args=args,
+         jvmtype="-client",
+         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ 
+ def shell(resourcesdir, command, *args):
+     tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar"
+     os.system("jar cf %s %s" % (tmpjarpath, resourcesdir))
+     runnerargs = [tmpjarpath, command]
+     runnerargs.extend(args)
+     exec_storm_class(
+         "backtype.storm.command.shell_submission",
+         args=runnerargs,
+         jvmtype="-client",
+         extrajars=[USER_CONF_DIR],
+         fork=True)
+     os.system("rm " + tmpjarpath)
+ 
+ def repl():
+     """Syntax: [storm repl]
+ 
+     Opens up a Clojure REPL with the storm jars and configuration
+     on the classpath. Useful for debugging.
+     """
+     cppaths = [CLUSTER_CONF_DIR]
+     exec_storm_class("clojure.main", jvmtype="-client", extrajars=cppaths)
+ 
+ def get_logback_conf_dir():
+     cppaths = [CLUSTER_CONF_DIR]
+     storm_logback_conf_dir = confvalue("storm.logback.conf.dir", cppaths)
+     if(storm_logback_conf_dir == None or storm_logback_conf_dir == "nil"):
+         storm_logback_conf_dir = STORM_LOGBACK_CONF_DIR
+     return storm_logback_conf_dir
+ 
+ def nimbus(klass="backtype.storm.daemon.nimbus"):
+     """Syntax: [storm nimbus]
+ 
+     Launches the nimbus daemon. This command should be run under
+     supervision with a tool like daemontools or monit.
+ 
+     See Setting up a Storm cluster for more information.
+     (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
+     """
+     cppaths = [CLUSTER_CONF_DIR]
+     jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
+         "-Dlogfile.name=nimbus.log",
+         "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml"),
+     ]
+     exec_storm_class(
+         klass,
+         jvmtype="-server",
+         extrajars=cppaths,
+         jvmopts=jvmopts)
+ 
+ def supervisor(klass="backtype.storm.daemon.supervisor"):
+     """Syntax: [storm supervisor]
+ 
+     Launches the supervisor daemon. This command should be run
+     under supervision with a tool like daemontools or monit.
+ 
+     See Setting up a Storm cluster for more information.
+     (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
+     """
+     cppaths = [CLUSTER_CONF_DIR]
+     jvmopts = parse_args(confvalue("supervisor.childopts", cppaths)) + [
+         "-Dlogfile.name=supervisor.log",
+         "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml"),
+     ]
+     exec_storm_class(
+         klass,
+         jvmtype="-server",
+         extrajars=cppaths,
+         jvmopts=jvmopts)
+ 
+ def ui():
+     """Syntax: [storm ui]
+ 
+     Launches the UI daemon. The UI provides a web interface for a Storm
+     cluster and shows detailed stats about running topologies. This command
+     should be run under supervision with a tool like daemontools or monit.
+ 
+     See Setting up a Storm cluster for more information.
+     (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
+     """
+     cppaths = [CLUSTER_CONF_DIR]
+     jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [
+         "-Dlogfile.name=ui.log",
+         "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml")
+     ]
+     exec_storm_class(
+         "backtype.storm.ui.core",
+         jvmtype="-server",
+         jvmopts=jvmopts,
+         extrajars=[STORM_DIR, CLUSTER_CONF_DIR])
+ 
+ def logviewer():
+     """Syntax: [storm logviewer]
+ 
+     Launches the log viewer daemon. It provides a web interface for viewing
+     storm log files. This command should be run under supervision with a
+     tool like daemontools or monit.
+ 
+     See Setting up a Storm cluster for more information.
+     (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
+     """
+     cppaths = [CLUSTER_CONF_DIR]
+     jvmopts = parse_args(confvalue("logviewer.childopts", cppaths)) + [
+         "-Dlogfile.name=logviewer.log",
+         "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml")
+     ]
+     exec_storm_class(
+         "backtype.storm.daemon.logviewer",
+         jvmtype="-server",
+         jvmopts=jvmopts,
+         extrajars=[STORM_DIR, CLUSTER_CONF_DIR])
+ 
+ def drpc():
+     """Syntax: [storm drpc]
+ 
+     Launches a DRPC daemon. This command should be run under supervision
+     with a tool like daemontools or monit.
+ 
+     See Distributed RPC for more information.
+     (http://storm.incubator.apache.org/documentation/Distributed-RPC)
+     """
+     cppaths = [CLUSTER_CONF_DIR]
+     jvmopts = parse_args(confvalue("drpc.childopts", cppaths)) + [
+         "-Dlogfile.name=drpc.log",
+         "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml")
+     ]
+     exec_storm_class(
+         "backtype.storm.daemon.drpc",
+         jvmtype="-server",
+         jvmopts=jvmopts,
+         extrajars=[CLUSTER_CONF_DIR])
+ 
+ def dev_zookeeper():
+     """Syntax: [storm dev-zookeeper]
+ 
+     Launches a fresh Zookeeper server using "dev.zookeeper.path" as its local dir and
+     "storm.zookeeper.port" as its port. This is only intended for development/testing, the
+     Zookeeper instance launched is not configured to be used in production.
+     """
+     cppaths = [CLUSTER_CONF_DIR]
+     exec_storm_class(
+         "backtype.storm.command.dev_zookeeper",
+         jvmtype="-server",
+         extrajars=[CLUSTER_CONF_DIR])
+ 
+ def version():
+   """Syntax: [storm version]
+ 
+   Prints the version number of this Storm release.
+   """
+   cppaths = [CLUSTER_CONF_DIR]
+   exec_storm_class(
+        "backtype.storm.utils.VersionInfo",
+        jvmtype="-client",
+        extrajars=[CLUSTER_CONF_DIR])
+ 
+ def print_classpath():
+     """Syntax: [storm classpath]
+ 
+     Prints the classpath used by the storm client when running commands.
+     """
+     print(get_classpath([]))
+ 
+ def monitor(*args):
+     """Syntax: [storm monitor topology-name [-i interval-secs] [-m component-id] [-s stream-id] [-w [emitted | transferred]]]
+ 
+     Monitor given topology's throughput interactively.
+     One can specify poll-interval, component-id, stream-id, watch-item[emitted | transferred]
+     By default,
+         poll-interval is 4 seconds;
+         all component-ids will be list;
+         stream-id is 'default';
+         watch-item is 'emitted';
+     """
+     exec_storm_class(
+         "backtype.storm.command.monitor",
+         args=args,
+         jvmtype="-client",
+         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ 
+ 
+ def print_commands():
+     """Print all client commands and link to documentation"""
+     print("Commands:\n\t" +  "\n\t".join(sorted(COMMANDS.keys())))
+     print("\nHelp: \n\thelp \n\thelp <command>")
+     print("\nDocumentation for the storm client can be found at http://storm.incubator.apache.org/documentation/Command-line-client.html\n")
+     print("Configs can be overridden using one or more -c flags, e.g. \"storm list -c nimbus.host=nimbus.mycompany.com\"\n")
+ 
+ def print_usage(command=None):
+     """Print one help message or list of available commands"""
+     if command != None:
+         if command in COMMANDS:
+             print(COMMANDS[command].__doc__ or
+                   "No documentation provided for <%s>" % command)
+         else:
+            print("<%s> is not a valid command" % command)
+     else:
+         print_commands()
+ 
+ def unknown_command(*args):
+     print("Unknown command: [storm %s]" % ' '.join(sys.argv[1:]))
+     print_usage()
+     sys.exit(254)
+ 
+ COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, "logviewer": logviewer,
+             "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
+             "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath,
+             "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage,
+             "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor,
+             "upload-credentials": upload_credentials}
+ 
+ def parse_config(config_list):
+     global CONFIG_OPTS
+     if len(config_list) > 0:
+         for config in config_list:
+             CONFIG_OPTS.append(config)
+ 
+ def parse_config_opts(args):
+   curr = args[:]
+   curr.reverse()
+   config_list = []
+   args_list = []
+ 
+   while len(curr) > 0:
+     token = curr.pop()
+     if token == "-c":
+       config_list.append(curr.pop())
+     elif token == "--config":
+       global CONFFILE
+       CONFFILE = curr.pop()
+     else:
+       args_list.append(token)
+ 
+   return config_list, args_list
+ 
+ def main():
+     if len(sys.argv) <= 1:
+         print_usage()
+         sys.exit(-1)
+     global CONFIG_OPTS
+     config_list, args = parse_config_opts(sys.argv[1:])
+     parse_config(config_list)
+     COMMAND = args[0]
+     ARGS = args[1:]
+     (COMMANDS.get(COMMAND, unknown_command))(*ARGS)
+ 
+ if __name__ == "__main__":
+     main()