You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/01 22:40:33 UTC
[02/24] git commit: modify java refelction
modify java refelction
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ecb39a70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ecb39a70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ecb39a70
Branch: refs/heads/master
Commit: ecb39a70f157a5d08a0fb90d0d3319b1c9a71ceb
Parents: 7a21ae0
Author: JuDasheng <ju...@meituan.com>
Authored: Thu Mar 6 17:01:50 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Thu Mar 6 17:01:50 2014 +0800
----------------------------------------------------------------------
.../metric/api/rpc/CombinedShellMetric.java | 14 ++--
.../storm/metric/api/rpc/CountShellMetric.java | 18 ++---
.../storm/metric/api/rpc/IShellMetric.java | 8 +-
.../metric/api/rpc/ReducedShellMetric.java | 12 +--
.../jvm/backtype/storm/spout/ShellSpout.java | 84 ++++++++------------
.../src/jvm/backtype/storm/task/ShellBolt.java | 84 ++++++++------------
.../backtype/storm/task/TopologyContext.java | 38 +++++----
7 files changed, 113 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
index fd940a7..231c571 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
@@ -21,13 +21,11 @@ import backtype.storm.metric.api.CombinedMetric;
import backtype.storm.metric.api.ICombiner;
public class CombinedShellMetric extends CombinedMetric implements IShellMetric {
+ public CombinedShellMetric(ICombiner combiner) {
+ super(combiner);
+ }
- public CombinedShellMetric(ICombiner combiner) {
- super(combiner);
- }
-
- public void updateMetricFromRPC(Object value) {
- update(value);
- }
-
+ public void updateMetricFromRPC(Object value) {
+ update(value);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
index 1779223..def74c2 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
@@ -26,13 +26,13 @@ public class CountShellMetric extends CountMetric implements IShellMetric {
* if value is null, it will call incr()
* if value is long, it will call incrBy((long)params)
* */
- public void updateMetricFromRPC(Object value) {
- if (value == null) {
- incr();
- } else if (value instanceof Long) {
- incrBy((Long)value);
- } else {
- throw new RuntimeException("CountShellMetric updateMetricFromRPC params should be null or Long");
- }
- }
+ public void updateMetricFromRPC(Object value) {
+ if (value == null) {
+ incr();
+ } else if (value instanceof Long) {
+ incrBy((Long)value);
+ } else {
+ throw new RuntimeException("CountShellMetric updateMetricFromRPC params should be null or Long");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
index 9bec3a1..d53baea 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
@@ -20,14 +20,12 @@ package backtype.storm.metric.api.rpc;
import backtype.storm.metric.api.IMetric;
public interface IShellMetric extends IMetric {
- public static final String SHELL_METRICS_UPDATE_METHOD_NAME = "updateMetricFromRPC";
-
/***
* @function
- * This interface is used by ShellBolt and ShellSpout through RPC call to update Metric
+ * This interface is used by ShellBolt and ShellSpout through RPC call to update Metric
* @param
- * value used to update metric, its's meaning change according implementation
- * Object can be any json support types: String, Long, Double, Boolean, Null, List, Map
+ * value used to update metric, its's meaning change according implementation
+ * Object can be any json support types: String, Long, Double, Boolean, Null, List, Map
* */
public void updateMetricFromRPC(Object value);
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
index 727a709..097ed51 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
@@ -22,11 +22,11 @@ import backtype.storm.metric.api.ReducedMetric;
public class ReducedShellMetric extends ReducedMetric implements IShellMetric {
- public ReducedShellMetric(IReducer reducer) {
- super(reducer);
- }
+ public ReducedShellMetric(IReducer reducer) {
+ super(reducer);
+ }
- public void updateMetricFromRPC(Object value) {
- update(value);
- }
+ public void updateMetricFromRPC(Object value) {
+ update(value);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index e0abe8a..c6443e4 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -18,6 +18,7 @@
package backtype.storm.spout;
import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.IMetric;
import backtype.storm.metric.api.rpc.IShellMetric;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.ShellProcess;
@@ -41,8 +42,8 @@ public class ShellSpout implements ISpout {
private String[] _command;
private ShellProcess _process;
- private Map<String, IShellMetric> _registeredShellMetrics = new ConcurrentHashMap<String, IShellMetric>();
-
+ private TopologyContext _context;
+
public ShellSpout(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
}
@@ -55,6 +56,7 @@ public class ShellSpout implements ISpout {
SpoutOutputCollector collector) {
_process = new ShellProcess(_command);
_collector = collector;
+ _context = context;
try {
Number subpid = _process.launch(stormConf, context);
@@ -101,46 +103,35 @@ public class ShellSpout implements ISpout {
}
private void handleMetrics(Map action) {
- //get metrics
- Object nameObj = action.get("name");
- if ( !(nameObj instanceof String) ) {
- throw new RuntimeException("Receive Metrics name is not String");
- }
- String name = (String) nameObj;
- if (name == null || name.isEmpty()) {
- throw new RuntimeException("Receive Metrics name is NULL");
- }
- if ( !_registeredShellMetrics.containsKey(name)) {
- throw new RuntimeException("Receive Metrics name:" + name + " does not reigster.");
- }
- IShellMetric iMetric = _registeredShellMetrics.get(name);
-
- //get paramList
- Object paramsObj = action.get("params");
+ //get metric name
+ Object nameObj = action.get("name");
+ if (nameObj == null || !(nameObj instanceof String) ) {
+ throw new RuntimeException("Receive Metrics name is null or is not String");
+ }
+ String name = (String) nameObj;
+ if (name.isEmpty()) {
+ throw new RuntimeException("Receive Metrics name is empty");
+ }
- Class<? extends IShellMetric> oriClass = iMetric.getClass();
- Method method = null;
- try {
- method = oriClass.getMethod(IShellMetric.SHELL_METRICS_UPDATE_METHOD_NAME, new Class[]{Object.class});
- } catch (SecurityException e) {
- LOG.error("handleMetrics get method ["+name+"] SecurityException");
- throw new RuntimeException(e);
- } catch (NoSuchMethodException e) {
- LOG.error("handleMetrics get method ["+name+"] NoSuchMethodException");
- throw new RuntimeException(e);
- }
- try {
- method.invoke(iMetric, paramsObj);
- } catch (IllegalArgumentException e) {
- LOG.error("handleMetrics invoke["+name+"] IllegalArgumentException");
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- LOG.error("handleMetrics invoke["+name+"] IllegalAccessException");
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- LOG.error("handleMetrics invoke["+name+"] InvocationTargetException");
- throw new RuntimeException(e);
- }
+ //get metric by name
+ IMetric iMetric = _context.getRegisteredMetricByName(name);
+ if (iMetric == null) {
+ throw new RuntimeException("Not find metric by name["+name+"] ");
+ }
+ if ( !(iMetric instanceof IShellMetric)) {
+ throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+ }
+ IShellMetric iShellMetric = (IShellMetric)iMetric;
+
+ //call updateMetricFromRPC with params
+ Object paramsObj = action.get("params");
+ try {
+ iShellMetric.updateMetricFromRPC(paramsObj);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
private void querySubprocess(Object query) {
@@ -171,22 +162,13 @@ public class ShellSpout implements ISpout {
_collector.emitDirect((int)task.longValue(), stream, tuple, messageId);
}
} else if (command.equals("metrics")) {
- handleMetrics(action);
+ handleMetrics(action);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
-
- public <T extends IShellMetric> T registerMetric(String name, T metric) {
- if ( _registeredShellMetrics.containsKey(name) ) {
- throw new RuntimeException("The same metric name `" + name + "` was registered in ShellSpout twice." );
- } else {
- _registeredShellMetrics.put(name, metric);
- }
- return metric;
- }
@Override
public void activate() {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index e6cbe5e..b2bdc22 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -19,6 +19,7 @@ package backtype.storm.task;
import backtype.storm.Config;
import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.IMetric;
import backtype.storm.metric.api.rpc.IShellMetric;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
@@ -81,9 +82,9 @@ public class ShellBolt implements IBolt {
private Thread _readerThread;
private Thread _writerThread;
-
- private Map<String, IShellMetric> _registeredShellMetrics = new ConcurrentHashMap<String, IShellMetric>();
+ private TopologyContext _context;
+
public ShellBolt(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
}
@@ -101,6 +102,7 @@ public class ShellBolt implements IBolt {
_rand = new Random();
_process = new ShellProcess(_command);
_collector = collector;
+ _context = context;
try {
//subprocesses must send their pid first thing
@@ -133,7 +135,7 @@ public class ShellBolt implements IBolt {
} else if (command.equals("emit")) {
handleEmit(action);
} else if (command.equals("metrics")) {
- handleMetrics(action);
+ handleMetrics(action);
}
} catch (InterruptedException e) {
} catch (Throwable t) {
@@ -192,15 +194,6 @@ public class ShellBolt implements IBolt {
_process.destroy();
_inputs.clear();
}
-
- public <T extends IShellMetric> T registerMetric(String name, T metric) {
- if ( _registeredShellMetrics.containsKey(name) ) {
- throw new RuntimeException("The same metric name `" + name + "` was registered in ShellBolt twice." );
- } else {
- _registeredShellMetrics.put(name, metric);
- }
- return metric;
- }
private void handleAck(Map action) {
String id = (String) action.get("id");
@@ -256,46 +249,35 @@ public class ShellBolt implements IBolt {
}
private void handleMetrics(Map action) {
- //get metrics
- Object nameObj = action.get("name");
- if ( !(nameObj instanceof String) ) {
- throw new RuntimeException("Receive Metrics name is not String");
- }
- String name = (String) nameObj;
- if (name == null || name.isEmpty()) {
- throw new RuntimeException("Receive Metrics name is NULL");
- }
- if ( !_registeredShellMetrics.containsKey(name)) {
- throw new RuntimeException("Receive Metrics name:" + name + " does not reigster.");
- }
- IShellMetric iMetric = _registeredShellMetrics.get(name);
-
- //get paramList
- Object paramsObj = action.get("params");
+ //get metric name
+ Object nameObj = action.get("name");
+ if (nameObj == null || !(nameObj instanceof String) ) {
+ throw new RuntimeException("Receive Metrics name is null or is not String");
+ }
+ String name = (String) nameObj;
+ if (name.isEmpty()) {
+ throw new RuntimeException("Receive Metrics name is empty");
+ }
- Class<? extends IShellMetric> oriClass = iMetric.getClass();
- Method method = null;
- try {
- method = oriClass.getMethod(IShellMetric.SHELL_METRICS_UPDATE_METHOD_NAME, new Class[]{Object.class});
- } catch (SecurityException e) {
- LOG.error("handleMetrics get method ["+name+"] SecurityException");
- throw new RuntimeException(e);
- } catch (NoSuchMethodException e) {
- LOG.error("handleMetrics get method ["+name+"] NoSuchMethodException");
- throw new RuntimeException(e);
- }
- try {
- method.invoke(iMetric, paramsObj);
- } catch (IllegalArgumentException e) {
- LOG.error("handleMetrics invoke["+name+"] IllegalArgumentException");
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- LOG.error("handleMetrics invoke["+name+"] IllegalAccessException");
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- LOG.error("handleMetrics invoke["+name+"] InvocationTargetException");
- throw new RuntimeException(e);
- }
+ //get metric by name
+ IMetric iMetric = _context.getRegisteredMetricByName(name);
+ if (iMetric == null) {
+ throw new RuntimeException("Not find metric by name["+name+"] ");
+ }
+ if ( !(iMetric instanceof IShellMetric)) {
+ throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+ }
+ IShellMetric iShellMetric = (IShellMetric)iMetric;
+
+ //call updateMetricFromRPC with params
+ Object paramsObj = action.get("params");
+ try {
+ iShellMetric.updateMetricFromRPC(paramsObj);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
private void die(Throwable exception) {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index 1285739..c0fcf20 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -252,23 +252,31 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
return metric;
}
-
- /*
- * Convinience method for ShellBolt to registering ShellMetric.
- */
- public <T extends IShellMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs, ShellBolt bolt) {
- bolt.registerMetric(name, metric);
- return registerMetric(name, metric, timeBucketSizeInSecs);
- }
-
- /*
- * Convinience method for ShellSpout to registering ShellMetric.
+
+ /**
+ * Get component's metric from registered metrics by name.
+ * Notice: Normally, one component can only register one metric name once.
+ * But now registerMetric has a bug(https://issues.apache.org/jira/browse/STORM-254)
+ * cause the same metric name can register twice.
+ * So we just return the first metric we meet.
*/
- public <T extends IShellMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs, ShellSpout spout) {
- spout.registerMetric(name, metric);
- return registerMetric(name, metric, timeBucketSizeInSecs);
- }
+ public IMetric getRegisteredMetricByName(String name) {
+ IMetric metric = null;
+ for (Map<Integer, Map<String, IMetric>> taskIdToNameToMetric: _registeredMetrics.values()) {
+ Map<String, IMetric> nameToMetric = taskIdToNameToMetric.get(_taskId);
+ if (nameToMetric != null) {
+ metric = nameToMetric.get(name);
+ if (metric != null) {
+ //we just return the first metric we meet
+ break;
+ }
+ }
+ }
+
+ return metric;
+ }
+
/*
* Convinience method for registering ReducedMetric.
*/