You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/16 16:48:10 UTC

[05/11] git commit: STORM-312 add storm monitor command line tools

STORM-312 add storm monitor command line tools


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/edcaf71b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/edcaf71b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/edcaf71b

Branch: refs/heads/master
Commit: edcaf71b304cc5196eec48035b06d15aaa8df433
Parents: a45b53a
Author: jiahong.ljh <ji...@alibaba-inc.com>
Authored: Fri May 16 15:14:05 2014 +0800
Committer: jiahong.ljh <ji...@alibaba-inc.com>
Committed: Fri May 16 15:14:05 2014 +0800

----------------------------------------------------------------------
 bin/storm                                       |  14 +-
 pom.xml                                         |   6 +
 storm-core/pom.xml                              |   4 +
 .../src/jvm/backtype/storm/command/Monitor.java | 197 +++++++++++++++++++
 4 files changed, 220 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/edcaf71b/bin/storm
----------------------------------------------------------------------
diff --git a/bin/storm b/bin/storm
index bb6e40a..0600d45 100755
--- a/bin/storm
+++ b/bin/storm
@@ -390,6 +390,18 @@ def print_classpath():
     """
     print get_classpath([])
 
+def monitor(*args):
+    """Syntax: [storm monitor]
+
+    Monitor given topologies' given components' or type of components' throughput interactively.
+    """
+    exec_storm_class(
+        "backtype.storm.command.Monitor",
+        args=args,
+        jvmtype="-client",
+        extrajars=[USER_CONF_DIR, STORM_DIR + "/bin"])
+
+
 def print_commands():
     """Print all client commands and link to documentation"""
     print "Commands:\n\t",  "\n\t".join(sorted(COMMANDS.keys()))
@@ -416,7 +428,7 @@ COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui
             "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
             "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath,
             "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage,
-            "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version}
+            "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor}
 
 def parse_config(config_list):
     global CONFIG_OPTS

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/edcaf71b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index aa93fcc..484f5f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -464,6 +464,12 @@
 		      <version>3.8.1</version>
 		      <scope>test</scope>
 		    </dependency>
+
+            <dependency>
+                <groupId>args4j</groupId>
+                <artifactId>args4j</artifactId>
+                <version>2.0.16</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/edcaf71b/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index fec6218..33f6cc6 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -165,6 +165,10 @@
             <groupId>io.netty</groupId>
             <artifactId>netty</artifactId>
         </dependency>
+        <dependency>
+            <groupId>args4j</groupId>
+            <artifactId>args4j</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.mockito</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/edcaf71b/storm-core/src/jvm/backtype/storm/command/Monitor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/command/Monitor.java b/storm-core/src/jvm/backtype/storm/command/Monitor.java
new file mode 100644
index 0000000..8f39d0c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/command/Monitor.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.command;
+
+import backtype.storm.generated.*;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import java.util.Map;
+
+public class Monitor {
+    @Option(name="--help", aliases={"-h"}, usage="print help message")
+    private boolean _help = false;
+
+    @Option(name="--interval", aliases={"-i"}, usage="poll frequency in seconds")
+    private int _interval = 4;
+
+    @Option(name="--name", aliases={"--topologyName"}, metaVar="NAME",
+            usage="base name of the topology (numbers may be appended to the end)")
+    private String _name;
+
+    @Option(name="--component", aliases={"--componentName"}, metaVar="NAME",
+            usage="component name of the topology")
+    private String _component;
+
+    @Option(name="--stat", aliases={"--statItem"}, metaVar="ITEM",
+            usage="stat item [emitted | transferred]")
+    private String _stat = "emitted";
+
+    private static class MetricsState {
+        long lastStatted = 0;
+        long lastTime = 0;
+    }
+
+    public void metrics(Nimbus.Client client, int poll, String name, String component, String stat) throws Exception {
+        System.out.println("status\ttopologie\tslots\tcomponent\texecutors\texecutorsWithMetrics\ttime-diff ms\t" + stat + "\tthroughput (Kt/s)");
+        MetricsState state = new MetricsState();
+        long pollMs = poll * 1000;
+        long now = System.currentTimeMillis();
+        state.lastTime = now;
+        long startTime = now;
+        long cycle, sleepTime, wakeupTime;
+
+        while (metrics(client, name, component, stat, now, state, "WAITING")) {
+            now = System.currentTimeMillis();
+            cycle = (now - startTime)/pollMs;
+            wakeupTime = startTime + (pollMs * (cycle + 1));
+            sleepTime = wakeupTime - now;
+            if (sleepTime > 0) {
+                Thread.sleep(sleepTime);
+            }
+            now = System.currentTimeMillis();
+        }
+
+        now = System.currentTimeMillis();
+        cycle = (now - startTime)/pollMs;
+        wakeupTime = startTime + (pollMs * (cycle + 1));
+        sleepTime = wakeupTime - now;
+        if (sleepTime > 0) {
+            Thread.sleep(sleepTime);
+        }
+        now = System.currentTimeMillis();
+        do {
+            metrics(client, name, component, stat, now, state, "RUNNING");
+            now = System.currentTimeMillis();
+            cycle = (now - startTime)/pollMs;
+            wakeupTime = startTime + (pollMs * (cycle + 1));
+            sleepTime = wakeupTime - now;
+            if (sleepTime > 0) {
+                Thread.sleep(sleepTime);
+            }
+            now = System.currentTimeMillis();
+        } while (true);
+    }
+
+    public boolean metrics(Nimbus.Client client, String name, String component, String stat, long now, MetricsState state, String message) throws Exception {
+        long totalStatted = 0;
+
+        boolean topologyFound = false;
+        boolean componentFound = false;
+        int slotsUsed = 0;
+        int executors = 0;
+        int executorsWithMetrics = 0;
+        ClusterSummary summary = client.getClusterInfo();
+        for (TopologySummary ts: summary.get_topologies()) {
+            if (name.equals(ts.get_name())) {
+                topologyFound = true;
+                slotsUsed = ts.get_num_workers();
+                String id = ts.get_id();
+                TopologyInfo info = client.getTopologyInfo(id);
+                for (ExecutorSummary es: info.get_executors()) {
+                    if (component.equals(es.get_component_id())) {
+                        componentFound = true;
+                        executors ++;
+                        ExecutorStats stats = es.get_stats();
+                        if (stats != null) {
+                            Map<String,Map<String,Long>> statted =
+                                    "emitted".equals(stat) ? stats.get_emitted() : stats.get_transferred();
+                            if ( statted != null) {
+                                Map<String, Long> e2 = statted.get(":all-time");
+                                if (e2 != null) {
+                                    executorsWithMetrics++;
+                                    //topology messages are always on the default stream, so just count those
+                                    Long dflt = e2.get("default");
+                                    if (dflt != null){
+                                        totalStatted += dflt;
+                                    }
+                                }
+                            }
+                        }
+                    }
+
+
+                }
+            }
+        }
+
+        if (!topologyFound) {
+            throw new IllegalArgumentException("topology: " + name + " not found");
+        }
+        if (!componentFound) {
+            throw new IllegalArgumentException("component: " + component + " not fouond");
+        }
+        long timeDelta = now - state.lastTime;
+        long stattedDelta = totalStatted - state.lastStatted;
+        state.lastTime = now;
+        state.lastStatted = totalStatted;
+        double throughput = (stattedDelta == 0 || timeDelta == 0) ? 0.0 : ((double)stattedDelta/(double)timeDelta);
+        System.out.println(message+"\t"+name+"\t"+slotsUsed+"\t"+component+"\t"+executors+"\t"+executorsWithMetrics+"\t"+timeDelta+"\t"+stattedDelta+"\t"+throughput);
+
+        return !(executors > 0 && executorsWithMetrics >= executors);
+    }
+
+    public void realMain(String[] args) {
+        /*** parse command line ***/
+        CmdLineParser parser = new CmdLineParser(this);
+        parser.setUsageWidth(80);
+        try {
+            parser.parseArgument(args);
+        } catch( CmdLineException e ) {
+            System.err.println(e.getMessage());
+            _help = true;
+        }
+        if(_help) {
+            parser.printUsage(System.err);
+            System.err.println();
+            return;
+        }
+
+        if (_name == null || _name.isEmpty()) {
+            throw new IllegalArgumentException("topology name must be something");
+        }
+
+        if (_component == null || _component.isEmpty()) {
+            throw new IllegalArgumentException("component name must be something");
+        }
+
+        if (_interval <= 0) {
+            throw new IllegalArgumentException("poll interval must be positive");
+        }
+
+        if ( !"transferred".equals(_stat) && !"emitted".equals(_stat)) {
+            throw new IllegalArgumentException("stat item must either be transferred or emitted");
+        }
+
+        Map clusterConf = Utils.readStormConfig();
+        clusterConf.putAll(Utils.readCommandLineOpts());
+        Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+        try {
+            metrics(client, _interval, _name, _component, _stat);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public static void main(String[] args) {
+        new Monitor().realMain(args);
+    }
+}