You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/16 16:48:12 UTC

[07/11] git commit: make topology-name to be an argument; make component-id optional

make topology-name to be an argument; make component-id optional


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/b68c33a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/b68c33a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/b68c33a3

Branch: refs/heads/master
Commit: b68c33a31927c45f52b61007ba155d89590bf26c
Parents: 81cf56e
Author: jiahong.ljh <ji...@alibaba-inc.com>
Authored: Wed Jul 2 15:55:21 2014 +0800
Committer: jiahong.ljh <ji...@alibaba-inc.com>
Committed: Wed Jul 2 15:55:21 2014 +0800

----------------------------------------------------------------------
 bin/storm                                       |  13 ++-
 .../src/clj/backtype/storm/command/monitor.clj  |   5 +-
 .../src/jvm/backtype/storm/utils/Monitor.java   | 113 ++++++++++++-------
 3 files changed, 84 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b68c33a3/bin/storm
----------------------------------------------------------------------
diff --git a/bin/storm b/bin/storm
index 9ec4ef4..9d7b133 100755
--- a/bin/storm
+++ b/bin/storm
@@ -391,10 +391,15 @@ def print_classpath():
     print get_classpath([])
 
 def monitor(*args):
-    """Syntax: [storm monitor [-i interval-secs] -t topology-name -m component-id [-s stream-id] [-w [emitted | transferred]]*]
-
-    Monitor topology's throughput interactively.
-    One can specify poll-interval, topology's name, component-id, stream-id, watch-item[emitted | transferred]
+    """Syntax: [storm monitor topology-name [-i interval-secs] [-m component-id] [-s stream-id] [-w [emitted | transferred]]]
+
+    Monitor given topology's throughput interactively.
+    One can specify poll-interval, component-id, stream-id, watch-item[emitted | transferred]
+    By default,
+        poll-interval is 4 seconds;
+        all component-ids will be list;
+        stream-id is 'default';
+        watch-item is 'emitted';
     """
     exec_storm_class(
         "backtype.storm.command.monitor",

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b68c33a3/storm-core/src/clj/backtype/storm/command/monitor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/monitor.clj b/storm-core/src/clj/backtype/storm/command/monitor.clj
index 0869032..17498e9 100644
--- a/storm-core/src/clj/backtype/storm/command/monitor.clj
+++ b/storm-core/src/clj/backtype/storm/command/monitor.clj
@@ -21,15 +21,14 @@
  )
 
 (defn -main [& args]
-  (let [[{interval :interval topology :topology component :component stream :stream watch :watch} [_] _]
+  (let [[{interval :interval component :component stream :stream watch :watch} [name] _]
         (cli args ["-i" "--interval" :default 4 :parse-fn #(Integer/parseInt %)]
-          ["-t" "--topology" :default nil]
           ["-m" "--component" :default nil]
           ["-s" "--stream" :default "default"]
           ["-w" "--watch" :default "emitted"])
         mon (Monitor.)]
     (if interval (.set_interval mon interval))
-    (if topology (.set_topology mon topology))
+    (if name (.set_topology mon name))
     (if component (.set_component mon component))
     (if stream (.set_stream mon stream))
     (if watch (.set_watch mon watch))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b68c33a3/storm-core/src/jvm/backtype/storm/utils/Monitor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Monitor.java b/storm-core/src/jvm/backtype/storm/utils/Monitor.java
index c517068..0c87d9b 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Monitor.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Monitor.java
@@ -18,6 +18,9 @@
 package backtype.storm.utils;
 
 import backtype.storm.generated.*;
+
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 
 public class Monitor {
@@ -56,6 +59,28 @@ public class Monitor {
         }
     }
 
+    private HashSet<String> getComponents(Nimbus.Client client, String topology) throws Exception{
+        HashSet<String> components = new HashSet<String>();
+        ClusterSummary clusterSummary = client.getClusterInfo();
+        TopologySummary topologySummary = null;
+        for (TopologySummary ts: clusterSummary.get_topologies()) {
+            if (topology.equals(ts.get_name())) {
+                topologySummary = ts;
+                break;
+            }
+        }
+        if (topologySummary == null) {
+            throw new IllegalArgumentException("topology: " + topology + " not found");
+        } else {
+            String id = topologySummary.get_id();
+            TopologyInfo info = client.getTopologyInfo(id);
+            for (ExecutorSummary es: info.get_executors()) {
+                components.add(es.get_component_id());
+            }
+        }
+        return components;
+    }
+
     public void metrics(Nimbus.Client client) throws Exception {
         if (_interval <= 0) {
             throw new IllegalArgumentException("poll interval must be positive");
@@ -66,7 +91,15 @@ public class Monitor {
         }
 
         if (_component == null || _component.isEmpty()) {
-            throw new IllegalArgumentException("component name must be something");
+            HashSet<String> components = getComponents(client, _topology);
+            System.out.println("Available components for " + _topology + " :");
+            System.out.println("------------------");
+            for (String comp : components) {
+                System.out.println(comp);
+            }
+            System.out.println("------------------");
+            System.out.println("Please use -m to specify one component");
+            return;
         }
 
         if (_stream == null || _stream.isEmpty()) {
@@ -76,7 +109,7 @@ public class Monitor {
         if ( !WATCH_TRANSFERRED.equals(_watch) && !WATCH_EMITTED.equals(_watch)) {
             throw new IllegalArgumentException("watch item must either be transferred or emitted");
         }
-        System.out.println("topology\tslots\texecutors\texecutorsWithMetrics\tcomponent\tstream\ttime-diff ms\t" + _watch + "\tthroughput (Kt/s)");
+        System.out.println("topology\tcomponent\tparallelism\tstream\ttime-diff ms\t" + _watch + "\tthroughput (Kt/s)");
 
         long pollMs = _interval * 1000;
 
@@ -97,63 +130,63 @@ public class Monitor {
     public void metrics(Nimbus.Client client, long now, MetricsState state) throws Exception {
         long totalStatted = 0;
 
-        boolean topologyFound = false;
-        boolean componentFound = false;
+        int componentParallelism = 0;
         boolean streamFound = false;
-        int slotsUsed = 0;
-        int executors = 0;
-        int executorsWithMetrics = 0;
-        ClusterSummary summary = client.getClusterInfo();
-        for (TopologySummary ts: summary.get_topologies()) {
+        ClusterSummary clusterSummary = client.getClusterInfo();
+        TopologySummary topologySummary = null;
+        for (TopologySummary ts: clusterSummary.get_topologies()) {
             if (_topology.equals(ts.get_name())) {
-                topologyFound = true;
-                slotsUsed = ts.get_num_workers();
-                String id = ts.get_id();
-                TopologyInfo info = client.getTopologyInfo(id);
-                for (ExecutorSummary es: info.get_executors()) {
-                    if (_component.equals(es.get_component_id())) {
-                        componentFound = true;
-                        executors ++;
-                        ExecutorStats stats = es.get_stats();
-                        if (stats != null) {
-                            Map<String,Map<String,Long>> statted =
-                                    WATCH_EMITTED.equals(_watch) ? stats.get_emitted() : stats.get_transferred();
-                            if ( statted != null) {
-                                Map<String, Long> e2 = statted.get(":all-time");
-                                if (e2 != null) {
-                                    Long stream = e2.get(_stream);
-                                    if (stream != null){
-                                        streamFound = true;
-                                        executorsWithMetrics ++;
-                                        totalStatted += stream;
-                                    }
+                topologySummary = ts;
+                break;
+            }
+        }
+        if (topologySummary == null) {
+            throw new IllegalArgumentException("topology: " + _topology + " not found");
+        } else {
+            String id = topologySummary.get_id();
+            TopologyInfo info = client.getTopologyInfo(id);
+            for (ExecutorSummary es: info.get_executors()) {
+                if (_component.equals(es.get_component_id())) {
+                    componentParallelism ++;
+                    ExecutorStats stats = es.get_stats();
+                    if (stats != null) {
+                        Map<String,Map<String,Long>> statted =
+                                WATCH_EMITTED.equals(_watch) ? stats.get_emitted() : stats.get_transferred();
+                        if ( statted != null) {
+                            Map<String, Long> e2 = statted.get(":all-time");
+                            if (e2 != null) {
+                                Long stream = e2.get(_stream);
+                                if (stream != null){
+                                    streamFound = true;
+                                    totalStatted += stream;
                                 }
                             }
                         }
                     }
-
-
                 }
             }
         }
 
-        if (!topologyFound) {
-            throw new IllegalArgumentException("topology: " + _topology + " not found");
-        }
-
-        if (!componentFound) {
-            throw new IllegalArgumentException("component: " + _component + " not fouond");
+        if (componentParallelism <= 0) {
+            HashSet<String> components = getComponents(client, _topology);
+            System.out.println("Available components for " + _topology + " :");
+            System.out.println("------------------");
+            for (String comp : components) {
+                System.out.println(comp);
+            }
+            System.out.println("------------------");
+            throw new IllegalArgumentException("component: " + _component + " not found");
         }
 
         if (!streamFound) {
-            throw new IllegalArgumentException("stream: " + _stream + " not fouond");
+            throw new IllegalArgumentException("stream: " + _stream + " not found");
         }
         long timeDelta = now - state.lastTime;
         long stattedDelta = totalStatted - state.lastStatted;
         state.lastTime = now;
         state.lastStatted = totalStatted;
         double throughput = (stattedDelta == 0 || timeDelta == 0) ? 0.0 : ((double)stattedDelta/(double)timeDelta);
-        System.out.println(_topology+"\t"+slotsUsed+"\t"+executors+"\t"+executorsWithMetrics+"\t"+_component+"\t"+_stream+"\t"+timeDelta+"\t"+stattedDelta+"\t"+throughput);
+        System.out.println(_topology+"\t"+_component+"\t"+componentParallelism+"\t"+_stream+"\t"+timeDelta+"\t"+stattedDelta+"\t"+throughput);
     }
 
     public void set_interval(int _interval) {