You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/16 16:48:10 UTC
[05/11] git commit: STORM-312 add storm monitor command line tools
STORM-312 add storm monitor command line tools
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/edcaf71b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/edcaf71b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/edcaf71b
Branch: refs/heads/master
Commit: edcaf71b304cc5196eec48035b06d15aaa8df433
Parents: a45b53a
Author: jiahong.ljh <ji...@alibaba-inc.com>
Authored: Fri May 16 15:14:05 2014 +0800
Committer: jiahong.ljh <ji...@alibaba-inc.com>
Committed: Fri May 16 15:14:05 2014 +0800
----------------------------------------------------------------------
bin/storm | 14 +-
pom.xml | 6 +
storm-core/pom.xml | 4 +
.../src/jvm/backtype/storm/command/Monitor.java | 197 +++++++++++++++++++
4 files changed, 220 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/edcaf71b/bin/storm
----------------------------------------------------------------------
diff --git a/bin/storm b/bin/storm
index bb6e40a..0600d45 100755
--- a/bin/storm
+++ b/bin/storm
@@ -390,6 +390,18 @@ def print_classpath():
"""
print get_classpath([])
+def monitor(*args):
+ """Syntax: [storm monitor]
+
+ Monitor given topologies' given components' or type of components' throughput interactively.
+ """
+ exec_storm_class(
+ "backtype.storm.command.Monitor",
+ args=args,
+ jvmtype="-client",
+ extrajars=[USER_CONF_DIR, STORM_DIR + "/bin"])
+
+
def print_commands():
"""Print all client commands and link to documentation"""
print "Commands:\n\t", "\n\t".join(sorted(COMMANDS.keys()))
@@ -416,7 +428,7 @@ COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui
"drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
"remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath,
"activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage,
- "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version}
+ "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor}
def parse_config(config_list):
global CONFIG_OPTS
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/edcaf71b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index aa93fcc..484f5f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -464,6 +464,12 @@
<version>3.8.1</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>args4j</groupId>
+ <artifactId>args4j</artifactId>
+ <version>2.0.16</version>
+ </dependency>
</dependencies>
</dependencyManagement>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/edcaf71b/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index fec6218..33f6cc6 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -165,6 +165,10 @@
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</dependency>
+ <dependency>
+ <groupId>args4j</groupId>
+ <artifactId>args4j</artifactId>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/edcaf71b/storm-core/src/jvm/backtype/storm/command/Monitor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/command/Monitor.java b/storm-core/src/jvm/backtype/storm/command/Monitor.java
new file mode 100644
index 0000000..8f39d0c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/command/Monitor.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.command;
+
+import backtype.storm.generated.*;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import java.util.Map;
+
+public class Monitor {
+ @Option(name="--help", aliases={"-h"}, usage="print help message")
+ private boolean _help = false;
+
+ @Option(name="--interval", aliases={"-i"}, usage="poll frequency in seconds")
+ private int _interval = 4;
+
+ @Option(name="--name", aliases={"--topologyName"}, metaVar="NAME",
+ usage="base name of the topology (numbers may be appended to the end)")
+ private String _name;
+
+ @Option(name="--component", aliases={"--componentName"}, metaVar="NAME",
+ usage="component name of the topology")
+ private String _component;
+
+ @Option(name="--stat", aliases={"--statItem"}, metaVar="ITEM",
+ usage="stat item [emitted | transferred]")
+ private String _stat = "emitted";
+
+ private static class MetricsState {
+ long lastStatted = 0;
+ long lastTime = 0;
+ }
+
+ public void metrics(Nimbus.Client client, int poll, String name, String component, String stat) throws Exception {
+ System.out.println("status\ttopologie\tslots\tcomponent\texecutors\texecutorsWithMetrics\ttime-diff ms\t" + stat + "\tthroughput (Kt/s)");
+ MetricsState state = new MetricsState();
+ long pollMs = poll * 1000;
+ long now = System.currentTimeMillis();
+ state.lastTime = now;
+ long startTime = now;
+ long cycle, sleepTime, wakeupTime;
+
+ while (metrics(client, name, component, stat, now, state, "WAITING")) {
+ now = System.currentTimeMillis();
+ cycle = (now - startTime)/pollMs;
+ wakeupTime = startTime + (pollMs * (cycle + 1));
+ sleepTime = wakeupTime - now;
+ if (sleepTime > 0) {
+ Thread.sleep(sleepTime);
+ }
+ now = System.currentTimeMillis();
+ }
+
+ now = System.currentTimeMillis();
+ cycle = (now - startTime)/pollMs;
+ wakeupTime = startTime + (pollMs * (cycle + 1));
+ sleepTime = wakeupTime - now;
+ if (sleepTime > 0) {
+ Thread.sleep(sleepTime);
+ }
+ now = System.currentTimeMillis();
+ do {
+ metrics(client, name, component, stat, now, state, "RUNNING");
+ now = System.currentTimeMillis();
+ cycle = (now - startTime)/pollMs;
+ wakeupTime = startTime + (pollMs * (cycle + 1));
+ sleepTime = wakeupTime - now;
+ if (sleepTime > 0) {
+ Thread.sleep(sleepTime);
+ }
+ now = System.currentTimeMillis();
+ } while (true);
+ }
+
+ public boolean metrics(Nimbus.Client client, String name, String component, String stat, long now, MetricsState state, String message) throws Exception {
+ long totalStatted = 0;
+
+ boolean topologyFound = false;
+ boolean componentFound = false;
+ int slotsUsed = 0;
+ int executors = 0;
+ int executorsWithMetrics = 0;
+ ClusterSummary summary = client.getClusterInfo();
+ for (TopologySummary ts: summary.get_topologies()) {
+ if (name.equals(ts.get_name())) {
+ topologyFound = true;
+ slotsUsed = ts.get_num_workers();
+ String id = ts.get_id();
+ TopologyInfo info = client.getTopologyInfo(id);
+ for (ExecutorSummary es: info.get_executors()) {
+ if (component.equals(es.get_component_id())) {
+ componentFound = true;
+ executors ++;
+ ExecutorStats stats = es.get_stats();
+ if (stats != null) {
+ Map<String,Map<String,Long>> statted =
+ "emitted".equals(stat) ? stats.get_emitted() : stats.get_transferred();
+ if ( statted != null) {
+ Map<String, Long> e2 = statted.get(":all-time");
+ if (e2 != null) {
+ executorsWithMetrics++;
+ //topology messages are always on the default stream, so just count those
+ Long dflt = e2.get("default");
+ if (dflt != null){
+ totalStatted += dflt;
+ }
+ }
+ }
+ }
+ }
+
+
+ }
+ }
+ }
+
+ if (!topologyFound) {
+ throw new IllegalArgumentException("topology: " + name + " not found");
+ }
+ if (!componentFound) {
+ throw new IllegalArgumentException("component: " + component + " not fouond");
+ }
+ long timeDelta = now - state.lastTime;
+ long stattedDelta = totalStatted - state.lastStatted;
+ state.lastTime = now;
+ state.lastStatted = totalStatted;
+ double throughput = (stattedDelta == 0 || timeDelta == 0) ? 0.0 : ((double)stattedDelta/(double)timeDelta);
+ System.out.println(message+"\t"+name+"\t"+slotsUsed+"\t"+component+"\t"+executors+"\t"+executorsWithMetrics+"\t"+timeDelta+"\t"+stattedDelta+"\t"+throughput);
+
+ return !(executors > 0 && executorsWithMetrics >= executors);
+ }
+
+ public void realMain(String[] args) {
+ /*** parse command line ***/
+ CmdLineParser parser = new CmdLineParser(this);
+ parser.setUsageWidth(80);
+ try {
+ parser.parseArgument(args);
+ } catch( CmdLineException e ) {
+ System.err.println(e.getMessage());
+ _help = true;
+ }
+ if(_help) {
+ parser.printUsage(System.err);
+ System.err.println();
+ return;
+ }
+
+ if (_name == null || _name.isEmpty()) {
+ throw new IllegalArgumentException("topology name must be something");
+ }
+
+ if (_component == null || _component.isEmpty()) {
+ throw new IllegalArgumentException("component name must be something");
+ }
+
+ if (_interval <= 0) {
+ throw new IllegalArgumentException("poll interval must be positive");
+ }
+
+ if ( !"transferred".equals(_stat) && !"emitted".equals(_stat)) {
+ throw new IllegalArgumentException("stat item must either be transferred or emitted");
+ }
+
+ Map clusterConf = Utils.readStormConfig();
+ clusterConf.putAll(Utils.readCommandLineOpts());
+ Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+ try {
+ metrics(client, _interval, _name, _component, _stat);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static void main(String[] args) {
+ new Monitor().realMain(args);
+ }
+}