You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/16 16:48:13 UTC

[08/11] git commit: rm unused :use class; better handling InterruptedException; change MetricState and Poller better fits for Java

rm unused :use class; better handling InterruptedException; change MetricState and Poller better fits for Java


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/e6b6e3ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/e6b6e3ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/e6b6e3ca

Branch: refs/heads/master
Commit: e6b6e3ca049f89222f2aad96e324aa47409383f1
Parents: b68c33a
Author: jiahong.ljh <ji...@alibaba-inc.com>
Authored: Tue Jul 8 11:56:53 2014 +0800
Committer: jiahong.ljh <ji...@alibaba-inc.com>
Committed: Tue Jul 8 11:56:53 2014 +0800

----------------------------------------------------------------------
 .../src/clj/backtype/storm/command/monitor.clj  |  2 +-
 .../src/jvm/backtype/storm/utils/Monitor.java   | 82 ++++++++++++++------
 2 files changed, 61 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e6b6e3ca/storm-core/src/clj/backtype/storm/command/monitor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/monitor.clj b/storm-core/src/clj/backtype/storm/command/monitor.clj
index 17498e9..36ccbc9 100644
--- a/storm-core/src/clj/backtype/storm/command/monitor.clj
+++ b/storm-core/src/clj/backtype/storm/command/monitor.clj
@@ -15,7 +15,7 @@
 ;; limitations under the License.
 (ns backtype.storm.command.monitor
   (:use [clojure.tools.cli :only [cli]])
-  (:use [backtype.storm thrift config log])
+  (:use [backtype.storm.thrift :only [with-configured-nimbus-connection]])
   (:import [backtype.storm.utils Monitor])
   (:gen-class)
  )

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e6b6e3ca/storm-core/src/jvm/backtype/storm/utils/Monitor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Monitor.java b/storm-core/src/jvm/backtype/storm/utils/Monitor.java
index 0c87d9b..43ddcf8 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Monitor.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Monitor.java
@@ -34,29 +34,67 @@ public class Monitor {
     private String _watch;
 
     private static class MetricsState {
-        long lastStatted = 0;
-        long lastTime = 0;
+        private long lastTime = 0;
+        private long lastStatted = 0;
+
+        private MetricsState(long lastTime, long lastStatted) {
+            this.lastTime = lastTime;
+            this.lastStatted = lastStatted;
+        }
+
+        public long getLastStatted() {
+            return lastStatted;
+        }
+
+        public void setLastStatted(long lastStatted) {
+            this.lastStatted = lastStatted;
+        }
+
+        public long getLastTime() {
+            return lastTime;
+        }
+
+        public void setLastTime(long lastTime) {
+            this.lastTime = lastTime;
+        }
     }
 
     private static class Poller {
-        long startTime = 0;
-        long pollMs = 0;
+        private long startTime = 0;
+        private long pollMs = 0;
+
+        private Poller(long startTime, long pollMs) {
+            this.startTime = startTime;
+            this.pollMs = pollMs;
+        }
 
-        public long nextPoll() {
+        public long nextPoll() throws InterruptedException {
             long now = System.currentTimeMillis();
             long cycle = (now - startTime) / pollMs;
             long wakeupTime = startTime + (pollMs * (cycle + 1));
             long sleepTime = wakeupTime - now;
             if (sleepTime > 0) {
-                try {
-                    Thread.sleep(sleepTime);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
+                Thread.sleep(sleepTime);
             }
             now = System.currentTimeMillis();
             return now;
         }
+
+        public long getStartTime() {
+            return startTime;
+        }
+
+        public void setStartTime(long startTime) {
+            this.startTime = startTime;
+        }
+
+        public long getPollMs() {
+            return pollMs;
+        }
+
+        public void setPollMs(long pollMs) {
+            this.pollMs = pollMs;
+        }
     }
 
     private HashSet<String> getComponents(Nimbus.Client client, String topology) throws Exception{
@@ -112,18 +150,18 @@ public class Monitor {
         System.out.println("topology\tcomponent\tparallelism\tstream\ttime-diff ms\t" + _watch + "\tthroughput (Kt/s)");
 
         long pollMs = _interval * 1000;
-
-        MetricsState state = new MetricsState();
-        Poller poller = new Poller();
         long now = System.currentTimeMillis();
-        state.lastTime = now;
-        state.lastStatted = 0;
-        poller.startTime = now;
-        poller.pollMs = pollMs;
+        MetricsState state = new MetricsState(now, 0);
+        Poller poller = new Poller(now, pollMs);
 
         do {
             metrics(client, now, state);
-            now = poller.nextPoll();
+            try {
+                now = poller.nextPoll();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+                break;
+            }
         } while (true);
     }
 
@@ -181,10 +219,10 @@ public class Monitor {
         if (!streamFound) {
             throw new IllegalArgumentException("stream: " + _stream + " not found");
         }
-        long timeDelta = now - state.lastTime;
-        long stattedDelta = totalStatted - state.lastStatted;
-        state.lastTime = now;
-        state.lastStatted = totalStatted;
+        long timeDelta = now - state.getLastTime();
+        long stattedDelta = totalStatted - state.getLastStatted();
+        state.setLastTime(now);
+        state.setLastStatted(totalStatted);
         double throughput = (stattedDelta == 0 || timeDelta == 0) ? 0.0 : ((double)stattedDelta/(double)timeDelta);
         System.out.println(_topology+"\t"+_component+"\t"+componentParallelism+"\t"+_stream+"\t"+timeDelta+"\t"+stattedDelta+"\t"+throughput);
     }