You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by GitBox <gi...@apache.org> on 2019/02/05 15:44:07 UTC

[GitHub] d2r commented on a change in pull request #2930: STORM-3274: Migrates storm CLI to using argparse making documentation…

d2r commented on a change in pull request #2930: STORM-3274: Migrates storm CLI to using argparse making documentation…
URL: https://github.com/apache/storm/pull/2930#discussion_r253919526
 
 

 ##########
 File path: bin/storm.py
 ##########
 @@ -296,787 +251,1090 @@ def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[]
     elif is_windows():
         # handling whitespaces in JAVA_CMD
         try:
-            ret = sub.check_output(all_args, stderr=sub.STDOUT)
+            ret = subprocess.check_output(all_args, stderr=subprocess.STDOUT)
             print(ret)
-        except sub.CalledProcessError as e:
+        except subprocess.CalledProcessError as e:
             print(e.output)
             sys.exit(e.returncode)
     else:
         os.execvp(JAVA_CMD, all_args)
     return exit_code
 
-def run_client_jar(jarfile, klass, args, daemon=False, client=True, extrajvmopts=[]):
-    global DEP_JARS_OPTS, DEP_ARTIFACTS_OPTS, DEP_ARTIFACTS_REPOSITORIES_OPTS, DEP_MAVEN_LOCAL_REPOSITORY_DIRECTORY, DEP_PROXY_URL, DEP_PROXY_USERNAME, DEP_PROXY_PASSWORD
 
-    local_jars = DEP_JARS_OPTS
-    artifact_to_file_jars = resolve_dependencies(DEP_ARTIFACTS_OPTS, DEP_ARTIFACTS_REPOSITORIES_OPTS, DEP_MAVEN_LOCAL_REPOSITORY_DIRECTORY, DEP_PROXY_URL, DEP_PROXY_USERNAME, DEP_PROXY_PASSWORD)
+def run_client_jar(klass, args, daemon=False, client=True, extrajvmopts=[]):
+    local_jars = args.jars.split(",")
+    jarfile = args.topology_jar_path
 
-    extra_jars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR]
+    artifact_to_file_jars = resolve_dependencies(
+        args.artifacts, args.artifactRepositories,
+        args.mavenLocalRepositoryDirectory, args.proxyUrl,
+        args.proxyUsername, args.proxyPassword
+    )
+
+    extra_jars = [jarfile, USER_CONF_DIR, STORM_BIN_DIR]
     extra_jars.extend(local_jars)
     extra_jars.extend(artifact_to_file_jars.values())
     exec_storm_class(
-        klass,
+        klass, args.storm_config_opts,
         jvmtype="-client",
         extrajars=extra_jars,
-        args=args,
+        args=args.topology_main_args,
         daemon=False,
         jvmopts=JAR_JVM_OPTS + extrajvmopts + ["-Dstorm.jar=" + jarfile] +
                 ["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
                 ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)])
 
-def local(jarfile, klass, *args):
-    """Syntax: [storm local topology-jar-path class ...]
 
-    Runs the main method of class with the specified arguments but pointing to a local cluster
-    The storm jars and configs in ~/.storm are put on the classpath.
-    The process is configured so that StormSubmitter
-    (http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
-    and others will interact with a local cluster instead of the one configured by default.
+def print_localconfvalue(args):
+    print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts, [USER_CONF_DIR]))
+
+
+def print_remoteconfvalue(args):
+    print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts, [CLUSTER_CONF_DIR]))
+
+
+def initialize_main_command():
+    main_parser = argparse.ArgumentParser(prog="storm", formatter_class=SortingHelpFormatter)
+    add_common_options(main_parser)
+
+    subparsers = main_parser.add_subparsers(help="")
+
+    initialize_jar_subcommand(subparsers)
+    initialize_localconfvalue_subcommand(subparsers)
+    initialize_remoteconfvalue_subcommand(subparsers)
+    initialize_local_subcommand(subparsers)
+    initialize_sql_subcommand(subparsers)
+    initialize_kill_subcommand(subparsers)
+    initialize_upload_credentials_subcommand(subparsers)
+    initialize_blobstore_subcommand(subparsers)
+    initialize_heartbeats_subcommand(subparsers)
+    initialize_activate_subcommand(subparsers)
+    initialize_listtopos_subcommand(subparsers)
+    initialize_deactivate_subcommand(subparsers)
+    initialize_rebalance_subcommand(subparsers)
+    initialize_get_errors_subcommand(subparsers)
+    initialize_healthcheck_subcommand(subparsers)
+    initialize_kill_workers_subcommand(subparsers)
+    initialize_admin_subcommand(subparsers)
+    initialize_shell_subcommand(subparsers)
+    initialize_repl_subcommand(subparsers)
+    initialize_nimbus_subcommand(subparsers)
+    initialize_pacemaker_subcommand(subparsers)
+    initialize_supervisor_subcommand(subparsers)
+    initialize_ui_subcommand(subparsers)
+    initialize_logviewer_subcommand(subparsers)
+    initialize_drpc_client_subcommand(subparsers)
+    initialize_drpc_subcommand(subparsers)
+    initialize_dev_zookeeper_subcommand(subparsers)
+    initialize_version_subcommand(subparsers)
+    initialize_classpath_subcommand(subparsers)
+    initialize_server_classpath_subcommand(subparsers)
+    initialize_monitor_subcommand(subparsers)
+
+    return main_parser
+
+
+def initialize_localconfvalue_subcommand(subparsers):
+    command_help = '''Prints out the value for conf-name in the local Storm configs.
+    The local Storm configs are the ones in ~/.storm/storm.yaml merged
+    in with the configs in defaults.yaml.'''
 
-    Most options should work just like with the storm jar command.
+    sub_parser = subparsers.add_parser("localconfvalue", help=command_help, formatter_class=SortingHelpFormatter)
+    sub_parser.add_argument("conf_name")
+    sub_parser.set_defaults(func=print_localconfvalue)
+    add_common_options(sub_parser)
 
-    local also adds in the option --local-ttl which sets the number of seconds the
-    local cluster will run for before it shuts down.
 
-    --local-zookeeper if using an external zookeeper sets the connection string to use for it.
 
-    --java-debug lets you turn on java debugging and set the parameters passed to -agentlib:jdwp on the JDK
-    --java-debug transport=dt_socket,address=localhost:8000
-    will open up a debugging server on port 8000.
-    """
-    [ttl, lzk, debug_args, args] = parse_local_opts(args)
-    extrajvmopts = ["-Dstorm.local.sleeptime=" + ttl]
-    if lzk != None:
-        extrajvmopts = extrajvmopts + ["-Dstorm.local.zookeeper=" + lzk]
-    if debug_args != None:
-        extrajvmopts = extrajvmopts + ["-agentlib:jdwp=" + debug_args]
-    run_client_jar(jarfile, "org.apache.storm.LocalCluster", [klass] + list(args), client=False, daemon=False, extrajvmopts=extrajvmopts)
-
-def jar(jarfile, klass, *args):
-    """Syntax: [storm jar topology-jar-path class ...]
-
-    Runs the main method of class with the specified arguments.
-    The storm worker dependencies and configs in ~/.storm are put on the classpath.
-    The process is configured so that StormSubmitter
-    (http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
-    will upload the jar at topology-jar-path when the topology is submitted.
+def initialize_remoteconfvalue_subcommand(subparsers):
+    command_help = '''Prints out the value for conf-name in the cluster's Storm configs.
+    The cluster's Storm configs are the ones in $STORM-PATH/conf/storm.yaml
+    merged in with the configs in defaults.yaml.
+
+    This command must be run on a cluster machine.'''
+
+    sub_parser = subparsers.add_parser("remoteconfvalue", help=command_help, formatter_class=SortingHelpFormatter)
+    sub_parser.add_argument("conf_name")
+    sub_parser.set_defaults(func=print_remoteconfvalue)
+    add_common_options(sub_parser)
 
-    When you want to ship other jars which is not included to application jar, you can pass them to --jars option with comma-separated string.
+
+def add_common_options(parser):
+    parser.add_argument("--config", default=None, help="Override default storm conf file")
+    parser.add_argument(
+        "-storm_config_opts", "-c", action="append", default=[],
+        help="Override storm conf properties , e.g. nimbus.ui.port=4443"
+    )
+
+def add_topology_jar_options(parser):
+    parser.add_argument("topology_jar_path", help="will upload the jar at topology-jar-path when the topology is submitted.")
+    parser.add_argument("topology_main_class", help="main class of the topology jar being submitted")
+    parser.add_argument("topology_main_args", nargs='*', help="Runs the main method with the specified arguments.")
+
+
+def add_client_jar_options(parser):
+
+    parser.add_argument("--jars", help='''
+    When you want to ship other jars which are not included to application jar, you can pass them to --jars option with comma-separated string.
     For example, --jars "your-local-jar.jar,your-local-jar2.jar" will load your-local-jar.jar and your-local-jar2.jar.
-    And when you want to ship maven artifacts and its transitive dependencies, you can pass them to --artifacts with comma-separated string.
+    ''', default="")
+
+    parser.add_argument("--artifacts", help='''
+     When you want to ship maven artifacts and its transitive dependencies, you can pass them to --artifacts with comma-separated string.
     You can also exclude some dependencies like what you're doing in maven pom.
     Please add exclusion artifacts with '^' separated string after the artifact.
     For example, -artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api" will load jedis and kafka-clients artifact and all of transitive dependencies but exclude slf4j-api from kafka.
+        ''', default="")
+
 
-    When you need to pull the artifacts from other than Maven Central, you can pass remote repositories to --artifactRepositories option with comma-separated string.
+    parser.add_argument("--artifactRepositories", help='''
+    When you need to pull the artifacts from other than Maven Central, you can pass remote repositories to --artifactRepositories option with a comma-separated string.
     Repository format is "<name>^<url>". '^' is taken as separator because URL allows various characters.
     For example, --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/" will add JBoss and HDP repositories for dependency resolver.
-    You can provide local maven repository directory via --mavenLocalRepositoryDirectory if you would like to use specific directory. It might help when you don't have '.m2/repository' directory in home directory, because CWD is sometimes non-deterministic (fragile).
+    ''', default="")
 
-    You can also provide proxy information to let dependency resolver utilizing proxy if needed. There're three parameters for proxy:
-    --proxyUrl: URL representation of proxy ('http://host:port')
-    --proxyUsername: username of proxy if it requires basic auth
-    --proxyPassword: password of proxy if it requires basic auth
+    parser.add_argument("--mavenLocalRepositoryDirectory", help="You can provide local maven repository directory via --mavenLocalRepositoryDirectory if you would like to use specific directory. It might help when you don't have '.m2/repository' directory in home directory, because CWD is sometimes non-deterministic (fragile).", default="")
 
-    Complete example of options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka-client/storm-kafka-client-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"`
+
+    parser.add_argument("--proxyUrl", help="You can also provide proxy information to let dependency resolver utilizing proxy if needed. URL representation of proxy ('http://host:port')", default="")
+    parser.add_argument("--proxyUsername", help="username of proxy if it requires basic auth", default="")
+    parser.add_argument("--proxyPassword", help="password of proxy if it requires basic auth", default="")
+
+
+def initialize_jar_subcommand(subparsers):
+    jar_help = """Runs the main method of class with the specified arguments.
+    The storm worker dependencies and configs in ~/.storm are put on the classpath.
+    The process is configured so that StormSubmitter
+    (http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
+    will upload the jar at topology-jar-path when the topology is submitted.
 
     When you pass jars and/or artifacts options, StormSubmitter will upload them when the topology is submitted, and they will be included to classpath of both the process which runs the class, and also workers for that topology.
+    """
+    jar_parser = subparsers.add_parser("jar", help=jar_help, formatter_class=SortingHelpFormatter)
+
+    add_topology_jar_options(jar_parser)
+    add_client_jar_options(jar_parser)
+
+    jar_parser.add_argument(
+        "--storm-server-classpath",
+        action='store_true',
+        help='''
+        If for some reason you need to have the full storm classpath,
+        not just the one for the worker you may include the command line option `--storm-server-classpath`.
+        Please be careful because this will add things to the classpath
+        that will not be on the worker classpath
+        and could result in the worker not running.'''
+    )
+
+    jar_parser.set_defaults(func=jar)
+    add_common_options(jar_parser)
+
+
+def initialize_local_subcommand(subparsers):
+    command_help = """Runs the main method of class with the specified arguments but pointing to a local cluster
+    The storm jars and configs in ~/.storm are put on the classpath.
+    The process is configured so that StormSubmitter
+    (http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
+    and others will interact with a local cluster instead of the one configured by default.
+
+    Most options should work just like with the storm jar command.
+    """
+    sub_parser = subparsers.add_parser("local", help=command_help, formatter_class=SortingHelpFormatter)
+
+    add_topology_jar_options(sub_parser)
+    add_client_jar_options(sub_parser)
+
+    sub_parser.add_argument(
+        "--local-ttl",
+        help="sets the number of seconds the local cluster will run for before it shuts down",
+        default=LOCAL_TTL_DEFAULT
+    )
+
+    sub_parser.add_argument(
+        "--java-debug",
+        help="lets you turn on java debugging and set the parameters passed to -agentlib:jdwp on the JDK" +
+             "e.g transport=dt_socket,address=localhost:8000 will open up a debugging server on port 8000",
+        default=None
+    )
+
+    sub_parser.set_defaults(func=local)
+    add_common_options(sub_parser)
+
+
+def initialize_kill_subcommand(subparsers):
+    command_help = """Kills the topology with the name topology-name. Storm will
+    first deactivate the topology's spouts for the duration of
+    the topology's message timeout to allow all messages currently
+    being processed to finish processing. Storm will then shutdown
+    the workers and clean up their state.
+    """
+    sub_parser = subparsers.add_parser("kill", help=command_help, formatter_class=SortingHelpFormatter)
+
+    sub_parser.add_argument("topology-name")
+
+    sub_parser.add_argument(
+        "-w", "--wait-time-secs",
+        help="""override the length of time Storm waits between deactivation and shutdown""",
+        default=None
+    )
+
+    sub_parser.set_defaults(func=kill)
+    add_common_options(sub_parser)
+
+
+def check_positive(value):
+    ivalue = int(value)
+    if ivalue <= 0:
+        raise argparse.ArgumentTypeError("%s is not a positive integer" % value)
+    return ivalue
+
+def check_even_list(cred_list):
+    if not (len(cred_list) % 2):
+        raise argparse.ArgumentTypeError("please provide a list of cred key and value pairs")
+    return cred_list
+
+
+def initialize_upload_credentials_subcommand(subparsers):
+    command_help = """Uploads a new set of credentials to a running topology."""
+    sub_parser = subparsers.add_parser("upload-credentials", help=command_help, formatter_class=SortingHelpFormatter)
+
+    sub_parser.add_argument("topology-name")
+
+    sub_parser.add_argument(
+        "-f", "--file", default=None,
+        help="""provide a properties file with credentials in it to be uploaded"""
+    )
+
+    sub_parser.add_argument(
+        "-u", "--user", default=None,
+        help="""name of the owner of the topology (security precaution)"""
+    )
+
+    sub_parser.add_argument(
+        "cred_list", nargs='*', help="List of credkeys and their values [credkey credvalue]*",
+        type=check_even_list
+    )
+
+    sub_parser.set_defaults(func=upload_credentials)
+    add_common_options(sub_parser)
+
+
+def initialize_sql_subcommand(subparsers):
+    command_help = """Compiles the SQL statements into a Trident topology and submits it to Storm.
+    If user activates explain mode, SQL Runner analyzes each query statement
+    and shows query plan instead of submitting topology.
+    """
+
+    sub_parser = subparsers.add_parser("sql", help=command_help, formatter_class=SortingHelpFormatter)
+
+    add_client_jar_options(sub_parser)
+
+    sub_parser.add_argument("sql_file", metavar="sql-file")
+    sub_parser.add_argument(
+        "topology_name", metavar="topology-name", help="should be --explain to activate explain mode"
+    )
+
+    sub_parser.set_defaults(func=sql)
+    add_common_options(sub_parser)
+
+
+def initialize_blobstore_subcommand(subparsers):
+    sub_parser = subparsers.add_parser("blobstore", formatter_class=SortingHelpFormatter)
+    command_help = """
+    For example, the following would create a mytopo:data.tgz key using the data
+    stored in data.tgz.  User alice would have full access, bob would have
+    read/write access and everyone else would have read access.
+    storm blobstore create mytopo:data.tgz -f data.tgz -a u:alice:rwa,u:bob:rw,o::r
+    """
+    sub_sub_parsers = sub_parser.add_subparsers(help=command_help)
+
+    list_parser = sub_sub_parsers.add_parser(
+        "list", help="lists blobs currently in the blob store", formatter_class=SortingHelpFormatter
+    )
+    list_parser.add_argument(
+        "keys", nargs='*')
+
+    cat_parser = sub_sub_parsers.add_parser(
+        "cat", help="read a blob and then either write it to a file, or STDOUT (requires read access).", formatter_class=SortingHelpFormatter
+    )
+    cat_parser.add_argument("KEY")
+    cat_parser.add_argument("-f", '--FILE', default=None)
+
+
+    create_parser = sub_sub_parsers.add_parser(
+        "create", help="create a new blob. Contents comes from a FILE or STDIN", formatter_class=SortingHelpFormatter
+    )
+    create_parser.add_argument("KEY")
+    create_parser.add_argument("-f", '--file', default=None)
+    create_parser.add_argument(
+        "-a", '--acl', default=None,
+        help="ACL is in the form [uo]:[username]:[r-][w-][a-] can be comma separated list."
+    )
+    create_parser.add_argument("-r", "--replication-factor", default=None)
+
+    update_parser = sub_sub_parsers.add_parser(
+        "update", help="update the contents of a blob.  Contents comes from a FILE or STDIN (requires write access).", formatter_class=SortingHelpFormatter
+    )
+    update_parser.add_argument("KEY")
+    update_parser.add_argument("-f", '--FILE', default=None)
+
+    delete_parser = sub_sub_parsers.add_parser(
+        "delete", help="delete an entry from the blob store (requires write access).", formatter_class=SortingHelpFormatter
+    )
+    delete_parser.add_argument("KEY")
+
+    set_acl_parser = sub_sub_parsers.add_parser(
+        "set-acl", help="set acls for the given key", formatter_class=SortingHelpFormatter
+    )
+    set_acl_parser.add_argument("KEY")
+    set_acl_parser.add_argument(
+        "-s", '--set', default=None,
+        help="""ACL is in the form [uo]:[username]:[r-][w-][a-] 
+        can be comma separated list (requires admin access)."""
+    )
+
+    replication_parser = sub_sub_parsers.add_parser(
+        "replication", formatter_class=SortingHelpFormatter
+    )
+    replication_parser.add_argument("KEY")
+    replication_parser.add_argument(
+        "--read", action="store_true", help="Used to read the replication factor of the blob"
+    )
+    replication_parser.add_argument(
+        "--update", action="store_true", help=" It is used to update the replication factor of a blob."
+    )
+    replication_parser.add_argument("-r", "--replication-factor", default=None)
+
+    sub_parser.set_defaults(func=blob)
+    add_common_options(sub_parser)
+
+
+def initialize_heartbeats_subcommand(subparsers):
+    sub_parser = subparsers.add_parser("heartbeats")
+    sub_sub_parsers = sub_parser.add_subparsers()
+
+    list_parser = sub_sub_parsers.add_parser(
+        "PATH", help="lists heartbeats nodes under PATH currently in the ClusterState", formatter_class=SortingHelpFormatter
+    )
+    list_parser.add_argument("PATH")
+
+    get_parser = sub_sub_parsers.add_parser(
+        "get", help="Get the heartbeat data at PATH", formatter_class=SortingHelpFormatter
+    )
+    get_parser.add_argument("PATH")
+    sub_parser.set_defaults(func=heartbeats)
+    add_common_options(sub_parser)
+
+
+def initialize_activate_subcommand(subparsers):
+    sub_parser = subparsers.add_parser(
+        "activate", help="Activates the specified topology's spouts.", formatter_class=SortingHelpFormatter
+    )
+
+    sub_parser.add_argument("topology-name")
+
+    sub_parser.set_defaults(func=activate)
+    add_common_options(sub_parser)
+
+
+def initialize_listtopos_subcommand(subparsers):
+    sub_parser = subparsers.add_parser(
+        "list", help="List the running topologies and their statuses.", formatter_class=SortingHelpFormatter
+    )
+
+    sub_parser.set_defaults(func=listtopos)
+    add_common_options(sub_parser)
+
+
+def initialize_deactivate_subcommand(subparsers):
+    sub_parser = subparsers.add_parser(
+        "deactivate", help="Deactivates the specified topology's spouts.", formatter_class=SortingHelpFormatter
+    )
+
+    sub_parser.add_argument("topology-name")
+
+    sub_parser.set_defaults(func=deactivate)
+    add_common_options(sub_parser)
+
+
+def initialize_rebalance_subcommand(subparsers):
+    command_help = """
+    Sometimes you may wish to spread out the workers for a running topology.
+    For example, let's say you have a 10 node cluster running
+    4 workers per node, and then let's say you add another 10 nodes to
+    the cluster. You may wish to have Storm spread out the workers for the
+    running topology so that each node runs 2 workers. One way to do this
+    is to kill the topology and resubmit it, but Storm provides a "rebalance"
+    command that provides an easier way to do this.
+
+    Rebalance will first deactivate the topology for the duration of the
+    message timeout (overridable with the -w flag) make requested adjustments to the topology
+    and let the scheduler try to find a better scheduling based off of the
+    new situation. The topology will then return to its previous state of activation
+    (so a deactivated topology will still be deactivated and an activated
+    topology will go back to being activated).
+    """
+    sub_parser = subparsers.add_parser(
+        "rebalance", help=command_help, formatter_class=SortingHelpFormatter
+    )
+
+    sub_parser.add_argument("-w", "--wait", default=None)
+
+    sub_parser.add_argument(
+        "-n", "--num-workers", default=None,
+        help="change the number of requested workers"
+    )
+
+    sub_parser.add_argument(
+        "-e", "--executors", action="append", default=[],
+        help="change the number of executors for a given component"
+    )
+
+    sub_parser.add_argument(
+        "-r", "--resources", default=None,
+        help="""
+        change the resources each component is requesting as used by the resource aware scheduler
+        e.g '{"component1": {"resource1": new_amount, "resource2": new_amount, ... }*}'
+        """
+    )
+
+    sub_parser.add_argument(
+        "-t", "--topology-conf", default=None,
+        help="change the topology conf"
+    )
+
+    sub_parser.add_argument("topology-name")
+
+    sub_parser.set_defaults(func=rebalance)
+    add_common_options(sub_parser)
+
+
+def initialize_get_errors_subcommand(subparsers):
+    sub_parser = subparsers.add_parser(
+        "get-errors", help="""Get the latest error from the running topology. The returned result contains
+    the key value pairs for component-name and component-error for the components in error.
+    The result is returned in json format.""", formatter_class=SortingHelpFormatter
+    )
+
+    sub_parser.add_argument("topology-name")
+
+    sub_parser.set_defaults(func=get_errors)
+    add_common_options(sub_parser)
+
+
+def initialize_healthcheck_subcommand(subparsers):
+    sub_parser = subparsers.add_parser(
+        "node-health-check", help="""Run health checks on the local supervisor.""", formatter_class=SortingHelpFormatter
+    )
+
+    sub_parser.set_defaults(func=healthcheck)
+    add_common_options(sub_parser)
+
+
+def initialize_kill_workers_subcommand(subparsers):
+    sub_parser = subparsers.add_parser(
+        "kill_workers", help="""Kill the workers running on this supervisor. This command should be run
+    on a supervisor node. If the cluster is running in secure mode, then user needs
+    to have admin rights on the node to be able to successfully kill all workers.""", formatter_class=SortingHelpFormatter
+    )
+
+    sub_parser.set_defaults(func=kill_workers)
+    add_common_options(sub_parser)
+
+
+def initialize_admin_subcommand(subparsers):
+    sub_parser = subparsers.add_parser("admin", help="""The storm admin command provides access to several operations that can help
+    an administrator debug or fix a cluster.""", formatter_class=SortingHelpFormatter)
+    sub_sub_parsers = sub_parser.add_subparsers()
+
+    remove_sub_sub_parser = sub_sub_parsers.add_parser(
+        "remove_corrupt_topologies", help="""This command should be run on a nimbus node as
+    the same user nimbus runs as.  It will go directly to zookeeper + blobstore
+    and find topologies that appear to be corrupted because of missing blobs.
+    It will kill those topologies.""", formatter_class=SortingHelpFormatter
+    )
+
+    add_common_options(remove_sub_sub_parser)
 
-    If for some reason you need to have the full storm classpath, not just the one for the worker you may include the command line option `--storm-server-classpath`.  Please be careful because this will add things to the classpath that will not be on the worker classpath and could result in the worker not running.
+    zk_cli_parser = sub_sub_parsers.add_parser(
+        "zk_cli", help="""This command will launch a zookeeper cli pointing to the
+    storm zookeeper instance logged in as the nimbus user.  It should be run on
+    a nimbus server as the user nimbus runs as.""", formatter_class=SortingHelpFormatter
+    )
+
+    zk_cli_parser.add_argument(
+        "-s", "--server", default=None, help="""Set the connection string to use,
+        defaults to storm connection string"""
+    )
+
+    zk_cli_parser.add_argument(
+        "-t", "--time-out", default=None, help="""Set the timeout to use, defaults to storm
+            zookeeper timeout."""
+    )
 
 Review comment:
   This has been addressed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services