You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/16 16:48:13 UTC
[08/11] git commit: rm unused :use class;
better handling InterruptedException;
change MetricState and Poller better fits for Java
rm unused :use class; better handling InterruptedException; change MetricState and Poller better fits for Java
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/e6b6e3ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/e6b6e3ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/e6b6e3ca
Branch: refs/heads/master
Commit: e6b6e3ca049f89222f2aad96e324aa47409383f1
Parents: b68c33a
Author: jiahong.ljh <ji...@alibaba-inc.com>
Authored: Tue Jul 8 11:56:53 2014 +0800
Committer: jiahong.ljh <ji...@alibaba-inc.com>
Committed: Tue Jul 8 11:56:53 2014 +0800
----------------------------------------------------------------------
.../src/clj/backtype/storm/command/monitor.clj | 2 +-
.../src/jvm/backtype/storm/utils/Monitor.java | 82 ++++++++++++++------
2 files changed, 61 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e6b6e3ca/storm-core/src/clj/backtype/storm/command/monitor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/monitor.clj b/storm-core/src/clj/backtype/storm/command/monitor.clj
index 17498e9..36ccbc9 100644
--- a/storm-core/src/clj/backtype/storm/command/monitor.clj
+++ b/storm-core/src/clj/backtype/storm/command/monitor.clj
@@ -15,7 +15,7 @@
;; limitations under the License.
(ns backtype.storm.command.monitor
(:use [clojure.tools.cli :only [cli]])
- (:use [backtype.storm thrift config log])
+ (:use [backtype.storm.thrift :only [with-configured-nimbus-connection]])
(:import [backtype.storm.utils Monitor])
(:gen-class)
)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e6b6e3ca/storm-core/src/jvm/backtype/storm/utils/Monitor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Monitor.java b/storm-core/src/jvm/backtype/storm/utils/Monitor.java
index 0c87d9b..43ddcf8 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Monitor.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Monitor.java
@@ -34,29 +34,67 @@ public class Monitor {
private String _watch;
private static class MetricsState {
- long lastStatted = 0;
- long lastTime = 0;
+ private long lastTime = 0;
+ private long lastStatted = 0;
+
+ private MetricsState(long lastTime, long lastStatted) {
+ this.lastTime = lastTime;
+ this.lastStatted = lastStatted;
+ }
+
+ public long getLastStatted() {
+ return lastStatted;
+ }
+
+ public void setLastStatted(long lastStatted) {
+ this.lastStatted = lastStatted;
+ }
+
+ public long getLastTime() {
+ return lastTime;
+ }
+
+ public void setLastTime(long lastTime) {
+ this.lastTime = lastTime;
+ }
}
private static class Poller {
- long startTime = 0;
- long pollMs = 0;
+ private long startTime = 0;
+ private long pollMs = 0;
+
+ private Poller(long startTime, long pollMs) {
+ this.startTime = startTime;
+ this.pollMs = pollMs;
+ }
- public long nextPoll() {
+ public long nextPoll() throws InterruptedException {
long now = System.currentTimeMillis();
long cycle = (now - startTime) / pollMs;
long wakeupTime = startTime + (pollMs * (cycle + 1));
long sleepTime = wakeupTime - now;
if (sleepTime > 0) {
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ Thread.sleep(sleepTime);
}
now = System.currentTimeMillis();
return now;
}
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getPollMs() {
+ return pollMs;
+ }
+
+ public void setPollMs(long pollMs) {
+ this.pollMs = pollMs;
+ }
}
private HashSet<String> getComponents(Nimbus.Client client, String topology) throws Exception{
@@ -112,18 +150,18 @@ public class Monitor {
System.out.println("topology\tcomponent\tparallelism\tstream\ttime-diff ms\t" + _watch + "\tthroughput (Kt/s)");
long pollMs = _interval * 1000;
-
- MetricsState state = new MetricsState();
- Poller poller = new Poller();
long now = System.currentTimeMillis();
- state.lastTime = now;
- state.lastStatted = 0;
- poller.startTime = now;
- poller.pollMs = pollMs;
+ MetricsState state = new MetricsState(now, 0);
+ Poller poller = new Poller(now, pollMs);
do {
metrics(client, now, state);
- now = poller.nextPoll();
+ try {
+ now = poller.nextPoll();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ break;
+ }
} while (true);
}
@@ -181,10 +219,10 @@ public class Monitor {
if (!streamFound) {
throw new IllegalArgumentException("stream: " + _stream + " not found");
}
- long timeDelta = now - state.lastTime;
- long stattedDelta = totalStatted - state.lastStatted;
- state.lastTime = now;
- state.lastStatted = totalStatted;
+ long timeDelta = now - state.getLastTime();
+ long stattedDelta = totalStatted - state.getLastStatted();
+ state.setLastTime(now);
+ state.setLastStatted(totalStatted);
double throughput = (stattedDelta == 0 || timeDelta == 0) ? 0.0 : ((double)stattedDelta/(double)timeDelta);
System.out.println(_topology+"\t"+_component+"\t"+componentParallelism+"\t"+_stream+"\t"+timeDelta+"\t"+stattedDelta+"\t"+throughput);
}