You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/01 22:40:54 UTC

[23/24] git commit: Merge branch 'master' of https://github.com/dashengju/incubator-storm into STORM-200

Merge branch 'master' of https://github.com/dashengju/incubator-storm into STORM-200

STORM-200: Proposal for Multilang's Metrics feature


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ff345c1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ff345c1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ff345c1f

Branch: refs/heads/master
Commit: ff345c1fa9dcbe55e96037bec3b59d06c3f64cd4
Parents: 18a0721 573c42a
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Tue Jul 1 15:23:36 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Tue Jul 1 15:23:36 2014 -0500

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/testing.clj   |   5 +-
 .../src/dev/resources/tester_bolt_metrics.py    |  35 ++++
 .../src/dev/resources/tester_spout_metrics.py   |  51 +++++
 .../metric/api/rpc/AssignableShellMetric.java   |  30 +++
 .../metric/api/rpc/CombinedShellMetric.java     |  31 +++
 .../storm/metric/api/rpc/CountShellMetric.java  |  38 ++++
 .../storm/metric/api/rpc/IShellMetric.java      |  31 +++
 .../metric/api/rpc/ReducedShellMetric.java      |  32 +++
 .../storm/multilang/JsonSerializer.java         |  10 +
 .../jvm/backtype/storm/multilang/ShellMsg.java  |  20 ++
 .../jvm/backtype/storm/spout/ShellSpout.java    |  37 ++++
 .../src/jvm/backtype/storm/task/ShellBolt.java  |  38 ++++
 .../backtype/storm/task/TopologyContext.java    |  24 +++
 .../storm/testing/PythonShellMetricsBolt.java   |  32 +++
 .../storm/testing/PythonShellMetricsSpout.java  |  35 ++++
 storm-core/src/multilang/py/storm.py            |   3 +
 .../test/clj/backtype/storm/metrics_test.clj    | 206 ++++++++++++-------
 17 files changed, 588 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ff345c1f/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ff345c1f/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ff345c1f/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 532545f,5ab327e..430581d
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@@ -115,9 -123,11 +123,11 @@@ public class ShellBolt implements IBol
                              handleError(shellMsg.getMsg());
                          } else if (command.equals("log")) {
                              String msg = shellMsg.getMsg();
 -                            LOG.info("Shell msg: " + msg);
 +                            LOG.info("Shell msg: " + msg + _process.getProcessInfoString());
                          } else if (command.equals("emit")) {
                              handleEmit(shellMsg);
+                         } else if (command.equals("metrics")) {
+                             handleMetrics(shellMsg);
                          }
                      } catch (InterruptedException e) {
                      } catch (Throwable t) {
@@@ -224,10 -233,36 +234,38 @@@
                      shellMsg.getStream(), anchors, shellMsg.getTuple());
          }
      }
+     
+     private void handleMetrics(ShellMsg shellMsg) {
+         //get metric name
+         String name = shellMsg.getMetricName();
+         if (name.isEmpty()) {
+             throw new RuntimeException("Receive Metrics name is empty");
+         }
+         
+         //get metric by name
+         IMetric iMetric = _context.getRegisteredMetricByName(name);
+         if (iMetric == null) {
+             throw new RuntimeException("Could not find metric by name["+name+"] ");
+         }
+         if ( !(iMetric instanceof IShellMetric)) {
+             throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+         }
+         IShellMetric iShellMetric = (IShellMetric)iMetric;
+         
+         //call updateMetricFromRPC with params
+         Object paramsObj = shellMsg.getMetricParams();
+         try {
+             iShellMetric.updateMetricFromRPC(paramsObj);
+         } catch (RuntimeException re) {
+             throw re;
+         } catch (Exception e) {
+             throw new RuntimeException(e);
+         }       
+     }
  
      private void die(Throwable exception) {
 -        _exception = exception;
 +        String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
 +        _exception = new RuntimeException(processInfo, exception);
      }
 +
  }