You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/01 22:40:33 UTC

[02/24] git commit: modify java refelction

modify java refelction


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ecb39a70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ecb39a70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ecb39a70

Branch: refs/heads/master
Commit: ecb39a70f157a5d08a0fb90d0d3319b1c9a71ceb
Parents: 7a21ae0
Author: JuDasheng <ju...@meituan.com>
Authored: Thu Mar 6 17:01:50 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Thu Mar 6 17:01:50 2014 +0800

----------------------------------------------------------------------
 .../metric/api/rpc/CombinedShellMetric.java     | 14 ++--
 .../storm/metric/api/rpc/CountShellMetric.java  | 18 ++---
 .../storm/metric/api/rpc/IShellMetric.java      |  8 +-
 .../metric/api/rpc/ReducedShellMetric.java      | 12 +--
 .../jvm/backtype/storm/spout/ShellSpout.java    | 84 ++++++++------------
 .../src/jvm/backtype/storm/task/ShellBolt.java  | 84 ++++++++------------
 .../backtype/storm/task/TopologyContext.java    | 38 +++++----
 7 files changed, 113 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
index fd940a7..231c571 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
@@ -21,13 +21,11 @@ import backtype.storm.metric.api.CombinedMetric;
 import backtype.storm.metric.api.ICombiner;
 
 public class CombinedShellMetric extends CombinedMetric implements IShellMetric {
+    public CombinedShellMetric(ICombiner combiner) {
+        super(combiner);
+    }
 
-	public CombinedShellMetric(ICombiner combiner) {
-		super(combiner);
-	}
-
-	public void updateMetricFromRPC(Object value) {
-		update(value);
-	}
-
+    public void updateMetricFromRPC(Object value) {
+        update(value);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
index 1779223..def74c2 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
@@ -26,13 +26,13 @@ public class CountShellMetric extends CountMetric implements IShellMetric {
      *  if value is null, it will call incr()
      *  if value is long, it will call incrBy((long)params)
      * */
-	public void updateMetricFromRPC(Object value) {
-		if (value == null) {
-			incr();
-		} else if (value instanceof Long) {
-			incrBy((Long)value);
-		} else {
-			throw new RuntimeException("CountShellMetric updateMetricFromRPC params should be null or Long");
-		}
-	}
+    public void updateMetricFromRPC(Object value) {
+        if (value == null) {
+            incr();
+        } else if (value instanceof Long) {
+            incrBy((Long)value);
+        } else {
+            throw new RuntimeException("CountShellMetric updateMetricFromRPC params should be null or Long");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
index 9bec3a1..d53baea 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
@@ -20,14 +20,12 @@ package backtype.storm.metric.api.rpc;
 import backtype.storm.metric.api.IMetric;
 
 public interface IShellMetric extends IMetric {
-	public static final String SHELL_METRICS_UPDATE_METHOD_NAME = "updateMetricFromRPC";
-	
     /***
      * @function
-     * 	This interface is used by ShellBolt and ShellSpout through RPC call to update Metric 
+     *     This interface is used by ShellBolt and ShellSpout through RPC call to update Metric 
      * @param
-     * 	value used to update metric, its's meaning change according implementation
-     *  Object can be any json support types: String, Long, Double, Boolean, Null, List, Map
+     *     value used to update metric, its's meaning change according implementation
+     *     Object can be any json support types: String, Long, Double, Boolean, Null, List, Map
      * */
     public void updateMetricFromRPC(Object value);
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
index 727a709..097ed51 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
@@ -22,11 +22,11 @@ import backtype.storm.metric.api.ReducedMetric;
 
 public class ReducedShellMetric extends ReducedMetric implements IShellMetric {
 
-	public ReducedShellMetric(IReducer reducer) {
-		super(reducer);
-	}
+    public ReducedShellMetric(IReducer reducer) {
+        super(reducer);
+    }
 
-	public void updateMetricFromRPC(Object value) {
-		update(value);
-	}
+    public void updateMetricFromRPC(Object value) {
+        update(value);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index e0abe8a..c6443e4 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -18,6 +18,7 @@
 package backtype.storm.spout;
 
 import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.IMetric;
 import backtype.storm.metric.api.rpc.IShellMetric;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.utils.ShellProcess;
@@ -41,8 +42,8 @@ public class ShellSpout implements ISpout {
     private String[] _command;
     private ShellProcess _process;
     
-    private Map<String, IShellMetric> _registeredShellMetrics = new ConcurrentHashMap<String, IShellMetric>();
-
+    private TopologyContext _context;
+    
     public ShellSpout(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
     }
@@ -55,6 +56,7 @@ public class ShellSpout implements ISpout {
                      SpoutOutputCollector collector) {
         _process = new ShellProcess(_command);
         _collector = collector;
+        _context = context;
 
         try {
             Number subpid = _process.launch(stormConf, context);
@@ -101,46 +103,35 @@ public class ShellSpout implements ISpout {
     }
     
     private void handleMetrics(Map action) {
-    	//get metrics
-    	Object nameObj = action.get("name");
-    	if ( !(nameObj instanceof String) ) {
-    		throw new RuntimeException("Receive Metrics name is not String");
-    	}
-    	String name = (String) nameObj;
-    	if (name == null || name.isEmpty()) {
-    		throw new RuntimeException("Receive Metrics name is NULL");
-    	}
-    	if ( !_registeredShellMetrics.containsKey(name)) {
-    		throw new RuntimeException("Receive Metrics name:" + name + " does not reigster.");
-    	}
-    	IShellMetric iMetric = _registeredShellMetrics.get(name);
-    	
-    	//get paramList
-    	Object paramsObj = action.get("params");
+        //get metric name
+        Object nameObj = action.get("name");
+        if (nameObj == null || !(nameObj instanceof String) ) {
+            throw new RuntimeException("Receive Metrics name is null or is not String");
+        }
+        String name = (String) nameObj;
+        if (name.isEmpty()) {
+            throw new RuntimeException("Receive Metrics name is empty");
+        }
         
-    	Class<? extends IShellMetric> oriClass = iMetric.getClass();
-    	Method method = null;
-		try {
-			method = oriClass.getMethod(IShellMetric.SHELL_METRICS_UPDATE_METHOD_NAME, new Class[]{Object.class});
-		} catch (SecurityException e) {
-			LOG.error("handleMetrics get method ["+name+"] SecurityException");
-			throw new RuntimeException(e);
-		} catch (NoSuchMethodException e) {
-			LOG.error("handleMetrics get method ["+name+"] NoSuchMethodException");
-			throw new RuntimeException(e);
-		}
-    	try {
-			method.invoke(iMetric, paramsObj);
-		} catch (IllegalArgumentException e) {
-			LOG.error("handleMetrics invoke["+name+"] IllegalArgumentException");
-			throw new RuntimeException(e);
-		} catch (IllegalAccessException e) {
-			LOG.error("handleMetrics invoke["+name+"] IllegalAccessException");
-			throw new RuntimeException(e);
-		} catch (InvocationTargetException e) {
-			LOG.error("handleMetrics invoke["+name+"] InvocationTargetException");
-			throw new RuntimeException(e);
-		}
+        //get metric by name
+        IMetric iMetric = _context.getRegisteredMetricByName(name);
+        if (iMetric == null) {
+            throw new RuntimeException("Not find metric by name["+name+"] ");
+        }
+        if ( !(iMetric instanceof IShellMetric)) {
+            throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+        }
+        IShellMetric iShellMetric = (IShellMetric)iMetric;
+        
+        //call updateMetricFromRPC with params
+        Object paramsObj = action.get("params");
+        try {
+            iShellMetric.updateMetricFromRPC(paramsObj);
+        } catch (RuntimeException re) {
+            throw re;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }       
     }
 
     private void querySubprocess(Object query) {
@@ -171,22 +162,13 @@ public class ShellSpout implements ISpout {
                         _collector.emitDirect((int)task.longValue(), stream, tuple, messageId);
                     }
                 } else if (command.equals("metrics")) {
-                	handleMetrics(action);
+                    handleMetrics(action);
                 }
             }
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
     }
-    
-    public <T extends IShellMetric> T registerMetric(String name, T metric) {
-    	if ( _registeredShellMetrics.containsKey(name) ) {
-    		throw new RuntimeException("The same metric name `" + name + "` was registered in ShellSpout twice." );
-    	} else {
-    		_registeredShellMetrics.put(name, metric);
-    	}
-    	return metric;
-    }
 
     @Override
     public void activate() {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index e6cbe5e..b2bdc22 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -19,6 +19,7 @@ package backtype.storm.task;
 
 import backtype.storm.Config;
 import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.IMetric;
 import backtype.storm.metric.api.rpc.IShellMetric;
 import backtype.storm.tuple.MessageId;
 import backtype.storm.tuple.Tuple;
@@ -81,9 +82,9 @@ public class ShellBolt implements IBolt {
     
     private Thread _readerThread;
     private Thread _writerThread;
-
-    private Map<String, IShellMetric> _registeredShellMetrics = new ConcurrentHashMap<String, IShellMetric>();
     
+    private TopologyContext _context;
+
     public ShellBolt(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
     }
@@ -101,6 +102,7 @@ public class ShellBolt implements IBolt {
         _rand = new Random();
         _process = new ShellProcess(_command);
         _collector = collector;
+        _context = context;
 
         try {
             //subprocesses must send their pid first thing
@@ -133,7 +135,7 @@ public class ShellBolt implements IBolt {
                         } else if (command.equals("emit")) {
                             handleEmit(action);
                         } else if (command.equals("metrics")) {
-                        	handleMetrics(action);
+                            handleMetrics(action);
                         }
                     } catch (InterruptedException e) {
                     } catch (Throwable t) {
@@ -192,15 +194,6 @@ public class ShellBolt implements IBolt {
         _process.destroy();
         _inputs.clear();
     }
-
-    public <T extends IShellMetric> T registerMetric(String name, T metric) {
-    	if ( _registeredShellMetrics.containsKey(name) ) {
-    		throw new RuntimeException("The same metric name `" + name + "` was registered in ShellBolt twice." );
-    	} else {
-    		_registeredShellMetrics.put(name, metric);
-    	}
-    	return metric;
-    }
     
     private void handleAck(Map action) {
         String id = (String) action.get("id");
@@ -256,46 +249,35 @@ public class ShellBolt implements IBolt {
     }
     
     private void handleMetrics(Map action) {
-    	//get metrics
-    	Object nameObj = action.get("name");
-    	if ( !(nameObj instanceof String) ) {
-    		throw new RuntimeException("Receive Metrics name is not String");
-    	}
-    	String name = (String) nameObj;
-    	if (name == null || name.isEmpty()) {
-    		throw new RuntimeException("Receive Metrics name is NULL");
-    	}
-    	if ( !_registeredShellMetrics.containsKey(name)) {
-    		throw new RuntimeException("Receive Metrics name:" + name + " does not reigster.");
-    	}
-    	IShellMetric iMetric = _registeredShellMetrics.get(name);
-    	
-    	//get paramList
-    	Object paramsObj = action.get("params");
+        //get metric name
+        Object nameObj = action.get("name");
+        if (nameObj == null || !(nameObj instanceof String) ) {
+            throw new RuntimeException("Receive Metrics name is null or is not String");
+        }
+        String name = (String) nameObj;
+        if (name.isEmpty()) {
+            throw new RuntimeException("Receive Metrics name is empty");
+        }
         
-    	Class<? extends IShellMetric> oriClass = iMetric.getClass();
-    	Method method = null;
-		try {
-			method = oriClass.getMethod(IShellMetric.SHELL_METRICS_UPDATE_METHOD_NAME, new Class[]{Object.class});
-		} catch (SecurityException e) {
-			LOG.error("handleMetrics get method ["+name+"] SecurityException");
-			throw new RuntimeException(e);
-		} catch (NoSuchMethodException e) {
-			LOG.error("handleMetrics get method ["+name+"] NoSuchMethodException");
-			throw new RuntimeException(e);
-		}
-    	try {
-			method.invoke(iMetric, paramsObj);
-		} catch (IllegalArgumentException e) {
-			LOG.error("handleMetrics invoke["+name+"] IllegalArgumentException");
-			throw new RuntimeException(e);
-		} catch (IllegalAccessException e) {
-			LOG.error("handleMetrics invoke["+name+"] IllegalAccessException");
-			throw new RuntimeException(e);
-		} catch (InvocationTargetException e) {
-			LOG.error("handleMetrics invoke["+name+"] InvocationTargetException");
-			throw new RuntimeException(e);
-		}
+        //get metric by name
+        IMetric iMetric = _context.getRegisteredMetricByName(name);
+        if (iMetric == null) {
+            throw new RuntimeException("Not find metric by name["+name+"] ");
+        }
+        if ( !(iMetric instanceof IShellMetric)) {
+            throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+        }
+        IShellMetric iShellMetric = (IShellMetric)iMetric;
+        
+        //call updateMetricFromRPC with params
+        Object paramsObj = action.get("params");
+        try {
+            iShellMetric.updateMetricFromRPC(paramsObj);
+        } catch (RuntimeException re) {
+            throw re;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }       
     }
 
     private void die(Throwable exception) {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index 1285739..c0fcf20 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -252,23 +252,31 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
 
         return metric;
     }
-    
-    /*
-     * Convinience method for ShellBolt to registering ShellMetric.
-     */
-    public <T extends IShellMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs, ShellBolt bolt) {
-    	bolt.registerMetric(name, metric);
-        return registerMetric(name, metric, timeBucketSizeInSecs);
-    }
-    
-    /*
-     * Convinience method for ShellSpout to registering ShellMetric.
+
+    /**
+     * Get component's metric from registered metrics by name.
+     * Notice: Normally, one component can only register one metric name once.
+     *         But now registerMetric has a bug(https://issues.apache.org/jira/browse/STORM-254) 
+     *         cause the same metric name can register twice.
+     *         So we just return the first metric we meet.
      */
-    public <T extends IShellMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs, ShellSpout spout) {
-    	spout.registerMetric(name, metric);
-        return registerMetric(name, metric, timeBucketSizeInSecs);
-    }
+    public IMetric getRegisteredMetricByName(String name) {
+        IMetric metric = null;
 
+        for (Map<Integer, Map<String, IMetric>> taskIdToNameToMetric: _registeredMetrics.values()) {
+            Map<String, IMetric> nameToMetric = taskIdToNameToMetric.get(_taskId);
+            if (nameToMetric != null) {
+                metric = nameToMetric.get(name);
+                if (metric != null) {
+                    //we just return the first metric we meet
+                    break;  
+                }
+            }
+        } 
+        
+        return metric;
+    }   
+ 
     /*
      * Convinience method for registering ReducedMetric.
      */