You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/01 22:40:54 UTC
[23/24] git commit: Merge branch 'master' of
https://github.com/dashengju/incubator-storm into STORM-200
Merge branch 'master' of https://github.com/dashengju/incubator-storm into STORM-200
STORM-200: Proposal for Multilang's Metrics feature
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ff345c1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ff345c1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ff345c1f
Branch: refs/heads/master
Commit: ff345c1fa9dcbe55e96037bec3b59d06c3f64cd4
Parents: 18a0721 573c42a
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Tue Jul 1 15:23:36 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Tue Jul 1 15:23:36 2014 -0500
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/testing.clj | 5 +-
.../src/dev/resources/tester_bolt_metrics.py | 35 ++++
.../src/dev/resources/tester_spout_metrics.py | 51 +++++
.../metric/api/rpc/AssignableShellMetric.java | 30 +++
.../metric/api/rpc/CombinedShellMetric.java | 31 +++
.../storm/metric/api/rpc/CountShellMetric.java | 38 ++++
.../storm/metric/api/rpc/IShellMetric.java | 31 +++
.../metric/api/rpc/ReducedShellMetric.java | 32 +++
.../storm/multilang/JsonSerializer.java | 10 +
.../jvm/backtype/storm/multilang/ShellMsg.java | 20 ++
.../jvm/backtype/storm/spout/ShellSpout.java | 37 ++++
.../src/jvm/backtype/storm/task/ShellBolt.java | 38 ++++
.../backtype/storm/task/TopologyContext.java | 24 +++
.../storm/testing/PythonShellMetricsBolt.java | 32 +++
.../storm/testing/PythonShellMetricsSpout.java | 35 ++++
storm-core/src/multilang/py/storm.py | 3 +
.../test/clj/backtype/storm/metrics_test.clj | 206 ++++++++++++-------
17 files changed, 588 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ff345c1f/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ff345c1f/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ff345c1f/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 532545f,5ab327e..430581d
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@@ -115,9 -123,11 +123,11 @@@ public class ShellBolt implements IBol
handleError(shellMsg.getMsg());
} else if (command.equals("log")) {
String msg = shellMsg.getMsg();
- LOG.info("Shell msg: " + msg);
+ LOG.info("Shell msg: " + msg + _process.getProcessInfoString());
} else if (command.equals("emit")) {
handleEmit(shellMsg);
+ } else if (command.equals("metrics")) {
+ handleMetrics(shellMsg);
}
} catch (InterruptedException e) {
} catch (Throwable t) {
@@@ -224,10 -233,36 +234,38 @@@
shellMsg.getStream(), anchors, shellMsg.getTuple());
}
}
+
+ private void handleMetrics(ShellMsg shellMsg) {
+ //get metric name
+ String name = shellMsg.getMetricName();
+ if (name.isEmpty()) {
+ throw new RuntimeException("Receive Metrics name is empty");
+ }
+
+ //get metric by name
+ IMetric iMetric = _context.getRegisteredMetricByName(name);
+ if (iMetric == null) {
+ throw new RuntimeException("Could not find metric by name["+name+"] ");
+ }
+ if ( !(iMetric instanceof IShellMetric)) {
+ throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+ }
+ IShellMetric iShellMetric = (IShellMetric)iMetric;
+
+ //call updateMetricFromRPC with params
+ Object paramsObj = shellMsg.getMetricParams();
+ try {
+ iShellMetric.updateMetricFromRPC(paramsObj);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
private void die(Throwable exception) {
- _exception = exception;
+ String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
+ _exception = new RuntimeException(processInfo, exception);
}
+
}