You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/16 16:48:12 UTC
[07/11] git commit: make topology-name to be an argument;
make component-id optional
make topology-name to be an argument; make component-id optional
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/b68c33a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/b68c33a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/b68c33a3
Branch: refs/heads/master
Commit: b68c33a31927c45f52b61007ba155d89590bf26c
Parents: 81cf56e
Author: jiahong.ljh <ji...@alibaba-inc.com>
Authored: Wed Jul 2 15:55:21 2014 +0800
Committer: jiahong.ljh <ji...@alibaba-inc.com>
Committed: Wed Jul 2 15:55:21 2014 +0800
----------------------------------------------------------------------
bin/storm | 13 ++-
.../src/clj/backtype/storm/command/monitor.clj | 5 +-
.../src/jvm/backtype/storm/utils/Monitor.java | 113 ++++++++++++-------
3 files changed, 84 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b68c33a3/bin/storm
----------------------------------------------------------------------
diff --git a/bin/storm b/bin/storm
index 9ec4ef4..9d7b133 100755
--- a/bin/storm
+++ b/bin/storm
@@ -391,10 +391,15 @@ def print_classpath():
print get_classpath([])
def monitor(*args):
- """Syntax: [storm monitor [-i interval-secs] -t topology-name -m component-id [-s stream-id] [-w [emitted | transferred]]*]
-
- Monitor topology's throughput interactively.
- One can specify poll-interval, topology's name, component-id, stream-id, watch-item[emitted | transferred]
+ """Syntax: [storm monitor topology-name [-i interval-secs] [-m component-id] [-s stream-id] [-w [emitted | transferred]]]
+
+ Monitor given topology's throughput interactively.
+ One can specify poll-interval, component-id, stream-id, watch-item[emitted | transferred]
+ By default,
+ poll-interval is 4 seconds;
+ all component-ids will be list;
+ stream-id is 'default';
+ watch-item is 'emitted';
"""
exec_storm_class(
"backtype.storm.command.monitor",
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b68c33a3/storm-core/src/clj/backtype/storm/command/monitor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/monitor.clj b/storm-core/src/clj/backtype/storm/command/monitor.clj
index 0869032..17498e9 100644
--- a/storm-core/src/clj/backtype/storm/command/monitor.clj
+++ b/storm-core/src/clj/backtype/storm/command/monitor.clj
@@ -21,15 +21,14 @@
)
(defn -main [& args]
- (let [[{interval :interval topology :topology component :component stream :stream watch :watch} [_] _]
+ (let [[{interval :interval component :component stream :stream watch :watch} [name] _]
(cli args ["-i" "--interval" :default 4 :parse-fn #(Integer/parseInt %)]
- ["-t" "--topology" :default nil]
["-m" "--component" :default nil]
["-s" "--stream" :default "default"]
["-w" "--watch" :default "emitted"])
mon (Monitor.)]
(if interval (.set_interval mon interval))
- (if topology (.set_topology mon topology))
+ (if name (.set_topology mon name))
(if component (.set_component mon component))
(if stream (.set_stream mon stream))
(if watch (.set_watch mon watch))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b68c33a3/storm-core/src/jvm/backtype/storm/utils/Monitor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Monitor.java b/storm-core/src/jvm/backtype/storm/utils/Monitor.java
index c517068..0c87d9b 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Monitor.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Monitor.java
@@ -18,6 +18,9 @@
package backtype.storm.utils;
import backtype.storm.generated.*;
+
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
public class Monitor {
@@ -56,6 +59,28 @@ public class Monitor {
}
}
+ private HashSet<String> getComponents(Nimbus.Client client, String topology) throws Exception{
+ HashSet<String> components = new HashSet<String>();
+ ClusterSummary clusterSummary = client.getClusterInfo();
+ TopologySummary topologySummary = null;
+ for (TopologySummary ts: clusterSummary.get_topologies()) {
+ if (topology.equals(ts.get_name())) {
+ topologySummary = ts;
+ break;
+ }
+ }
+ if (topologySummary == null) {
+ throw new IllegalArgumentException("topology: " + topology + " not found");
+ } else {
+ String id = topologySummary.get_id();
+ TopologyInfo info = client.getTopologyInfo(id);
+ for (ExecutorSummary es: info.get_executors()) {
+ components.add(es.get_component_id());
+ }
+ }
+ return components;
+ }
+
public void metrics(Nimbus.Client client) throws Exception {
if (_interval <= 0) {
throw new IllegalArgumentException("poll interval must be positive");
@@ -66,7 +91,15 @@ public class Monitor {
}
if (_component == null || _component.isEmpty()) {
- throw new IllegalArgumentException("component name must be something");
+ HashSet<String> components = getComponents(client, _topology);
+ System.out.println("Available components for " + _topology + " :");
+ System.out.println("------------------");
+ for (String comp : components) {
+ System.out.println(comp);
+ }
+ System.out.println("------------------");
+ System.out.println("Please use -m to specify one component");
+ return;
}
if (_stream == null || _stream.isEmpty()) {
@@ -76,7 +109,7 @@ public class Monitor {
if ( !WATCH_TRANSFERRED.equals(_watch) && !WATCH_EMITTED.equals(_watch)) {
throw new IllegalArgumentException("watch item must either be transferred or emitted");
}
- System.out.println("topology\tslots\texecutors\texecutorsWithMetrics\tcomponent\tstream\ttime-diff ms\t" + _watch + "\tthroughput (Kt/s)");
+ System.out.println("topology\tcomponent\tparallelism\tstream\ttime-diff ms\t" + _watch + "\tthroughput (Kt/s)");
long pollMs = _interval * 1000;
@@ -97,63 +130,63 @@ public class Monitor {
public void metrics(Nimbus.Client client, long now, MetricsState state) throws Exception {
long totalStatted = 0;
- boolean topologyFound = false;
- boolean componentFound = false;
+ int componentParallelism = 0;
boolean streamFound = false;
- int slotsUsed = 0;
- int executors = 0;
- int executorsWithMetrics = 0;
- ClusterSummary summary = client.getClusterInfo();
- for (TopologySummary ts: summary.get_topologies()) {
+ ClusterSummary clusterSummary = client.getClusterInfo();
+ TopologySummary topologySummary = null;
+ for (TopologySummary ts: clusterSummary.get_topologies()) {
if (_topology.equals(ts.get_name())) {
- topologyFound = true;
- slotsUsed = ts.get_num_workers();
- String id = ts.get_id();
- TopologyInfo info = client.getTopologyInfo(id);
- for (ExecutorSummary es: info.get_executors()) {
- if (_component.equals(es.get_component_id())) {
- componentFound = true;
- executors ++;
- ExecutorStats stats = es.get_stats();
- if (stats != null) {
- Map<String,Map<String,Long>> statted =
- WATCH_EMITTED.equals(_watch) ? stats.get_emitted() : stats.get_transferred();
- if ( statted != null) {
- Map<String, Long> e2 = statted.get(":all-time");
- if (e2 != null) {
- Long stream = e2.get(_stream);
- if (stream != null){
- streamFound = true;
- executorsWithMetrics ++;
- totalStatted += stream;
- }
+ topologySummary = ts;
+ break;
+ }
+ }
+ if (topologySummary == null) {
+ throw new IllegalArgumentException("topology: " + _topology + " not found");
+ } else {
+ String id = topologySummary.get_id();
+ TopologyInfo info = client.getTopologyInfo(id);
+ for (ExecutorSummary es: info.get_executors()) {
+ if (_component.equals(es.get_component_id())) {
+ componentParallelism ++;
+ ExecutorStats stats = es.get_stats();
+ if (stats != null) {
+ Map<String,Map<String,Long>> statted =
+ WATCH_EMITTED.equals(_watch) ? stats.get_emitted() : stats.get_transferred();
+ if ( statted != null) {
+ Map<String, Long> e2 = statted.get(":all-time");
+ if (e2 != null) {
+ Long stream = e2.get(_stream);
+ if (stream != null){
+ streamFound = true;
+ totalStatted += stream;
}
}
}
}
-
-
}
}
}
- if (!topologyFound) {
- throw new IllegalArgumentException("topology: " + _topology + " not found");
- }
-
- if (!componentFound) {
- throw new IllegalArgumentException("component: " + _component + " not fouond");
+ if (componentParallelism <= 0) {
+ HashSet<String> components = getComponents(client, _topology);
+ System.out.println("Available components for " + _topology + " :");
+ System.out.println("------------------");
+ for (String comp : components) {
+ System.out.println(comp);
+ }
+ System.out.println("------------------");
+ throw new IllegalArgumentException("component: " + _component + " not found");
}
if (!streamFound) {
- throw new IllegalArgumentException("stream: " + _stream + " not fouond");
+ throw new IllegalArgumentException("stream: " + _stream + " not found");
}
long timeDelta = now - state.lastTime;
long stattedDelta = totalStatted - state.lastStatted;
state.lastTime = now;
state.lastStatted = totalStatted;
double throughput = (stattedDelta == 0 || timeDelta == 0) ? 0.0 : ((double)stattedDelta/(double)timeDelta);
- System.out.println(_topology+"\t"+slotsUsed+"\t"+executors+"\t"+executorsWithMetrics+"\t"+_component+"\t"+_stream+"\t"+timeDelta+"\t"+stattedDelta+"\t"+throughput);
+ System.out.println(_topology+"\t"+_component+"\t"+componentParallelism+"\t"+_stream+"\t"+timeDelta+"\t"+stattedDelta+"\t"+throughput);
}
public void set_interval(int _interval) {