You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/01 22:40:32 UTC

[01/24] git commit: add patch for new version multilang metric support

Repository: incubator-storm
Updated Branches:
  refs/heads/master 18a0721d8 -> 8da9572ac


add patch for new version multilang metric support


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/7a21ae00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/7a21ae00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/7a21ae00

Branch: refs/heads/master
Commit: 7a21ae00d18967b9539d5febb9a04dabefb1c8de
Parents: 254ec13
Author: JuDasheng <ju...@meituan.com>
Authored: Wed Mar 5 14:22:02 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Wed Mar 5 14:22:02 2014 +0800

----------------------------------------------------------------------
 .../metric/api/rpc/CombinedShellMetric.java     | 33 +++++++++++
 .../storm/metric/api/rpc/CountShellMetric.java  | 38 ++++++++++++
 .../storm/metric/api/rpc/IShellMetric.java      | 33 +++++++++++
 .../metric/api/rpc/ReducedShellMetric.java      | 32 ++++++++++
 .../jvm/backtype/storm/spout/ShellSpout.java    | 61 ++++++++++++++++++++
 .../src/jvm/backtype/storm/task/ShellBolt.java  | 59 +++++++++++++++++++
 .../backtype/storm/task/TopologyContext.java    | 20 ++++++-
 storm-core/src/multilang/py/storm.py            |  3 +
 8 files changed, 278 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7a21ae00/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
new file mode 100644
index 0000000..fd940a7
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.api.ICombiner;
+
+public class CombinedShellMetric extends CombinedMetric implements IShellMetric {
+
+	public CombinedShellMetric(ICombiner combiner) {
+		super(combiner);
+	}
+
+	public void updateMetricFromRPC(Object value) {
+		update(value);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7a21ae00/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
new file mode 100644
index 0000000..1779223
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.CountMetric;
+
+public class CountShellMetric extends CountMetric implements IShellMetric {
+    /***
+     * @param
+     *  params should be null or long
+     *  if value is null, it will call incr()
+     *  if value is long, it will call incrBy((long)params)
+     * */
+	public void updateMetricFromRPC(Object value) {
+		if (value == null) {
+			incr();
+		} else if (value instanceof Long) {
+			incrBy((Long)value);
+		} else {
+			throw new RuntimeException("CountShellMetric updateMetricFromRPC params should be null or Long");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7a21ae00/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
new file mode 100644
index 0000000..9bec3a1
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.IMetric;
+
+public interface IShellMetric extends IMetric {
+	public static final String SHELL_METRICS_UPDATE_METHOD_NAME = "updateMetricFromRPC";
+	
+    /***
+     * @function
+     * 	This interface is used by ShellBolt and ShellSpout through RPC call to update Metric 
+     * @param
+     * 	value used to update metric, its's meaning change according implementation
+     *  Object can be any json support types: String, Long, Double, Boolean, Null, List, Map
+     * */
+    public void updateMetricFromRPC(Object value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7a21ae00/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
new file mode 100644
index 0000000..727a709
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.IReducer;
+import backtype.storm.metric.api.ReducedMetric;
+
+public class ReducedShellMetric extends ReducedMetric implements IShellMetric {
+
+	public ReducedShellMetric(IReducer reducer) {
+		super(reducer);
+	}
+
+	public void updateMetricFromRPC(Object value) {
+		update(value);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7a21ae00/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index 67cb66f..e0abe8a 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -18,12 +18,17 @@
 package backtype.storm.spout;
 
 import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.rpc.IShellMetric;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.utils.ShellProcess;
 import backtype.storm.utils.Utils;
 import java.util.Map;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.json.simple.JSONObject;
@@ -35,6 +40,8 @@ public class ShellSpout implements ISpout {
     private SpoutOutputCollector _collector;
     private String[] _command;
     private ShellProcess _process;
+    
+    private Map<String, IShellMetric> _registeredShellMetrics = new ConcurrentHashMap<String, IShellMetric>();
 
     public ShellSpout(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
@@ -92,6 +99,49 @@ public class ShellSpout implements ISpout {
         _fail.put("id", msgId);
         querySubprocess(_fail);
     }
+    
+    private void handleMetrics(Map action) {
+    	//get metrics
+    	Object nameObj = action.get("name");
+    	if ( !(nameObj instanceof String) ) {
+    		throw new RuntimeException("Receive Metrics name is not String");
+    	}
+    	String name = (String) nameObj;
+    	if (name == null || name.isEmpty()) {
+    		throw new RuntimeException("Receive Metrics name is NULL");
+    	}
+    	if ( !_registeredShellMetrics.containsKey(name)) {
+    		throw new RuntimeException("Receive Metrics name:" + name + " does not reigster.");
+    	}
+    	IShellMetric iMetric = _registeredShellMetrics.get(name);
+    	
+    	//get paramList
+    	Object paramsObj = action.get("params");
+        
+    	Class<? extends IShellMetric> oriClass = iMetric.getClass();
+    	Method method = null;
+		try {
+			method = oriClass.getMethod(IShellMetric.SHELL_METRICS_UPDATE_METHOD_NAME, new Class[]{Object.class});
+		} catch (SecurityException e) {
+			LOG.error("handleMetrics get method ["+name+"] SecurityException");
+			throw new RuntimeException(e);
+		} catch (NoSuchMethodException e) {
+			LOG.error("handleMetrics get method ["+name+"] NoSuchMethodException");
+			throw new RuntimeException(e);
+		}
+    	try {
+			method.invoke(iMetric, paramsObj);
+		} catch (IllegalArgumentException e) {
+			LOG.error("handleMetrics invoke["+name+"] IllegalArgumentException");
+			throw new RuntimeException(e);
+		} catch (IllegalAccessException e) {
+			LOG.error("handleMetrics invoke["+name+"] IllegalAccessException");
+			throw new RuntimeException(e);
+		} catch (InvocationTargetException e) {
+			LOG.error("handleMetrics invoke["+name+"] InvocationTargetException");
+			throw new RuntimeException(e);
+		}
+    }
 
     private void querySubprocess(Object query) {
         try {
@@ -120,12 +170,23 @@ public class ShellSpout implements ISpout {
                     } else {
                         _collector.emitDirect((int)task.longValue(), stream, tuple, messageId);
                     }
+                } else if (command.equals("metrics")) {
+                	handleMetrics(action);
                 }
             }
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
     }
+    
+    public <T extends IShellMetric> T registerMetric(String name, T metric) {
+    	if ( _registeredShellMetrics.containsKey(name) ) {
+    		throw new RuntimeException("The same metric name `" + name + "` was registered in ShellSpout twice." );
+    	} else {
+    		_registeredShellMetrics.put(name, metric);
+    	}
+    	return metric;
+    }
 
     @Override
     public void activate() {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7a21ae00/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 118e90e..e6cbe5e 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -19,11 +19,14 @@ package backtype.storm.task;
 
 import backtype.storm.Config;
 import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.rpc.IShellMetric;
 import backtype.storm.tuple.MessageId;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.utils.Utils;
 import backtype.storm.utils.ShellProcess;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.concurrent.ConcurrentHashMap;
@@ -79,6 +82,8 @@ public class ShellBolt implements IBolt {
     private Thread _readerThread;
     private Thread _writerThread;
 
+    private Map<String, IShellMetric> _registeredShellMetrics = new ConcurrentHashMap<String, IShellMetric>();
+    
     public ShellBolt(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
     }
@@ -127,6 +132,8 @@ public class ShellBolt implements IBolt {
                             LOG.info("Shell msg: " + msg);
                         } else if (command.equals("emit")) {
                             handleEmit(action);
+                        } else if (command.equals("metrics")) {
+                        	handleMetrics(action);
                         }
                     } catch (InterruptedException e) {
                     } catch (Throwable t) {
@@ -186,6 +193,15 @@ public class ShellBolt implements IBolt {
         _inputs.clear();
     }
 
+    public <T extends IShellMetric> T registerMetric(String name, T metric) {
+    	if ( _registeredShellMetrics.containsKey(name) ) {
+    		throw new RuntimeException("The same metric name `" + name + "` was registered in ShellBolt twice." );
+    	} else {
+    		_registeredShellMetrics.put(name, metric);
+    	}
+    	return metric;
+    }
+    
     private void handleAck(Map action) {
         String id = (String) action.get("id");
         Tuple acked = _inputs.remove(id);
@@ -238,6 +254,49 @@ public class ShellBolt implements IBolt {
             _collector.emitDirect((int)task.longValue(), stream, anchors, tuple);
         }
     }
+    
+    private void handleMetrics(Map action) {
+    	//get metrics
+    	Object nameObj = action.get("name");
+    	if ( !(nameObj instanceof String) ) {
+    		throw new RuntimeException("Receive Metrics name is not String");
+    	}
+    	String name = (String) nameObj;
+    	if (name == null || name.isEmpty()) {
+    		throw new RuntimeException("Receive Metrics name is NULL");
+    	}
+    	if ( !_registeredShellMetrics.containsKey(name)) {
+    		throw new RuntimeException("Receive Metrics name:" + name + " does not reigster.");
+    	}
+    	IShellMetric iMetric = _registeredShellMetrics.get(name);
+    	
+    	//get paramList
+    	Object paramsObj = action.get("params");
+        
+    	Class<? extends IShellMetric> oriClass = iMetric.getClass();
+    	Method method = null;
+		try {
+			method = oriClass.getMethod(IShellMetric.SHELL_METRICS_UPDATE_METHOD_NAME, new Class[]{Object.class});
+		} catch (SecurityException e) {
+			LOG.error("handleMetrics get method ["+name+"] SecurityException");
+			throw new RuntimeException(e);
+		} catch (NoSuchMethodException e) {
+			LOG.error("handleMetrics get method ["+name+"] NoSuchMethodException");
+			throw new RuntimeException(e);
+		}
+    	try {
+			method.invoke(iMetric, paramsObj);
+		} catch (IllegalArgumentException e) {
+			LOG.error("handleMetrics invoke["+name+"] IllegalArgumentException");
+			throw new RuntimeException(e);
+		} catch (IllegalAccessException e) {
+			LOG.error("handleMetrics invoke["+name+"] IllegalAccessException");
+			throw new RuntimeException(e);
+		} catch (InvocationTargetException e) {
+			LOG.error("handleMetrics invoke["+name+"] InvocationTargetException");
+			throw new RuntimeException(e);
+		}
+    }
 
     private void die(Throwable exception) {
         _exception = exception;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7a21ae00/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index c9df979..1285739 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -26,6 +26,8 @@ import backtype.storm.metric.api.IReducer;
 import backtype.storm.metric.api.ICombiner;
 import backtype.storm.metric.api.ReducedMetric;
 import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.api.rpc.IShellMetric;
+import backtype.storm.spout.ShellSpout;
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
@@ -250,6 +252,22 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
 
         return metric;
     }
+    
+    /*
+     * Convinience method for ShellBolt to registering ShellMetric.
+     */
+    public <T extends IShellMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs, ShellBolt bolt) {
+    	bolt.registerMetric(name, metric);
+        return registerMetric(name, metric, timeBucketSizeInSecs);
+    }
+    
+    /*
+     * Convinience method for ShellSpout to registering ShellMetric.
+     */
+    public <T extends IShellMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs, ShellSpout spout) {
+    	spout.registerMetric(name, metric);
+        return registerMetric(name, metric, timeBucketSizeInSecs);
+    }
 
     /*
      * Convinience method for registering ReducedMetric.
@@ -263,4 +281,4 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
         return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7a21ae00/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
index bec3f0c..c0387b4 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -135,6 +135,9 @@ def reportError(msg):
 
 def log(msg):
     sendMsgToParent({"command": "log", "msg": msg})
+    
+def rpcMetrics(name, params):
+    sendMsgToParent({"command": "metrics", "name": name, "params": params})
 
 def initComponent():
     setupInfo = readMsg()


[23/24] git commit: Merge branch 'master' of https://github.com/dashengju/incubator-storm into STORM-200

Posted by bo...@apache.org.
Merge branch 'master' of https://github.com/dashengju/incubator-storm into STORM-200

STORM-200: Proposal for Multilang's Metrics feature


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ff345c1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ff345c1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ff345c1f

Branch: refs/heads/master
Commit: ff345c1fa9dcbe55e96037bec3b59d06c3f64cd4
Parents: 18a0721 573c42a
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Tue Jul 1 15:23:36 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Tue Jul 1 15:23:36 2014 -0500

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/testing.clj   |   5 +-
 .../src/dev/resources/tester_bolt_metrics.py    |  35 ++++
 .../src/dev/resources/tester_spout_metrics.py   |  51 +++++
 .../metric/api/rpc/AssignableShellMetric.java   |  30 +++
 .../metric/api/rpc/CombinedShellMetric.java     |  31 +++
 .../storm/metric/api/rpc/CountShellMetric.java  |  38 ++++
 .../storm/metric/api/rpc/IShellMetric.java      |  31 +++
 .../metric/api/rpc/ReducedShellMetric.java      |  32 +++
 .../storm/multilang/JsonSerializer.java         |  10 +
 .../jvm/backtype/storm/multilang/ShellMsg.java  |  20 ++
 .../jvm/backtype/storm/spout/ShellSpout.java    |  37 ++++
 .../src/jvm/backtype/storm/task/ShellBolt.java  |  38 ++++
 .../backtype/storm/task/TopologyContext.java    |  24 +++
 .../storm/testing/PythonShellMetricsBolt.java   |  32 +++
 .../storm/testing/PythonShellMetricsSpout.java  |  35 ++++
 storm-core/src/multilang/py/storm.py            |   3 +
 .../test/clj/backtype/storm/metrics_test.clj    | 206 ++++++++++++-------
 17 files changed, 588 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ff345c1f/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ff345c1f/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ff345c1f/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 532545f,5ab327e..430581d
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@@ -115,9 -123,11 +123,11 @@@ public class ShellBolt implements IBol
                              handleError(shellMsg.getMsg());
                          } else if (command.equals("log")) {
                              String msg = shellMsg.getMsg();
 -                            LOG.info("Shell msg: " + msg);
 +                            LOG.info("Shell msg: " + msg + _process.getProcessInfoString());
                          } else if (command.equals("emit")) {
                              handleEmit(shellMsg);
+                         } else if (command.equals("metrics")) {
+                             handleMetrics(shellMsg);
                          }
                      } catch (InterruptedException e) {
                      } catch (Throwable t) {
@@@ -224,10 -233,36 +234,38 @@@
                      shellMsg.getStream(), anchors, shellMsg.getTuple());
          }
      }
+     
+     private void handleMetrics(ShellMsg shellMsg) {
+         //get metric name
+         String name = shellMsg.getMetricName();
+         if (name.isEmpty()) {
+             throw new RuntimeException("Receive Metrics name is empty");
+         }
+         
+         //get metric by name
+         IMetric iMetric = _context.getRegisteredMetricByName(name);
+         if (iMetric == null) {
+             throw new RuntimeException("Could not find metric by name["+name+"] ");
+         }
+         if ( !(iMetric instanceof IShellMetric)) {
+             throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+         }
+         IShellMetric iShellMetric = (IShellMetric)iMetric;
+         
+         //call updateMetricFromRPC with params
+         Object paramsObj = shellMsg.getMetricParams();
+         try {
+             iShellMetric.updateMetricFromRPC(paramsObj);
+         } catch (RuntimeException re) {
+             throw re;
+         } catch (Exception e) {
+             throw new RuntimeException(e);
+         }       
+     }
  
      private void die(Throwable exception) {
 -        _exception = exception;
 +        String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
 +        _exception = new RuntimeException(processInfo, exception);
      }
 +
  }


[05/24] git commit: remove no use import

Posted by bo...@apache.org.
remove no use import


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/db709fe3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/db709fe3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/db709fe3

Branch: refs/heads/master
Commit: db709fe3eeabd98e5b44d3f5d84c1b5cfe74445a
Parents: 98a2720
Author: JuDasheng <ju...@meituan.com>
Authored: Thu Mar 6 17:13:55 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Thu Mar 6 17:13:55 2014 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/spout/ShellSpout.java | 3 ---
 storm-core/src/jvm/backtype/storm/task/ShellBolt.java   | 2 --
 2 files changed, 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/db709fe3/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index c6443e4..f54cf72 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -25,10 +25,7 @@ import backtype.storm.utils.ShellProcess;
 import backtype.storm.utils.Utils;
 import java.util.Map;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/db709fe3/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index b2bdc22..de3c3f2 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -26,8 +26,6 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.utils.Utils;
 import backtype.storm.utils.ShellProcess;
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.concurrent.ConcurrentHashMap;


[03/24] git commit: add AssignableShellMetric.java to rpc

Posted by bo...@apache.org.
add AssignableShellMetric.java to rpc


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/5448955d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/5448955d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/5448955d

Branch: refs/heads/master
Commit: 5448955dba2ffee133b1bd8b4d144611fcdcc751
Parents: ecb39a7
Author: JuDasheng <ju...@meituan.com>
Authored: Thu Mar 6 17:03:09 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Thu Mar 6 17:03:09 2014 +0800

----------------------------------------------------------------------
 .../storm/metric/api/rpc/AssignableShellMetric.java    | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5448955d/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
new file mode 100644
index 0000000..e0722b8
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
@@ -0,0 +1,13 @@
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.AssignableMetric;
+
+public class AssignableShellMetric extends AssignableMetric implements IShellMetric {
+    public AssignableShellMetric(Object value) {
+        super(value);
+    }
+
+    public void updateMetricFromRPC(Object value) {
+        setValue(value);
+    }
+}


[21/24] git commit: Merge branch 'master' of github.com:dashengju/incubator-storm

Posted by bo...@apache.org.
Merge branch 'master' of github.com:dashengju/incubator-storm


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/b004e064
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/b004e064
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/b004e064

Branch: refs/heads/master
Commit: b004e064706dc2ee2c90c54325b9f770c6c116ef
Parents: 4bab2b8 d854ecc
Author: dashengju <da...@qq.com>
Authored: Fri Jun 20 10:35:07 2014 +0800
Committer: dashengju <da...@qq.com>
Committed: Fri Jun 20 10:35:07 2014 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/spout/ShellSpout.java | 2 +-
 storm-core/src/jvm/backtype/storm/task/ShellBolt.java   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[13/24] git commit: change log format

Posted by bo...@apache.org.
change log format


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/4d6a27f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/4d6a27f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/4d6a27f4

Branch: refs/heads/master
Commit: 4d6a27f466b942cd011efb0ffd538f0c04daf047
Parents: 0427e06
Author: JuDasheng <ju...@meituan.com>
Authored: Thu Jun 12 17:46:08 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Thu Jun 12 17:46:08 2014 +0800

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/executor.clj  | 67 ++++++++++++++++----
 .../src/clj/backtype/storm/daemon/task.clj      |  4 +-
 2 files changed, 55 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4d6a27f4/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 1bbe53d..64e60be 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -18,6 +18,7 @@
   (:use [backtype.storm bootstrap])
   (:import [backtype.storm.hooks ITaskHook])
   (:import [backtype.storm.tuple Tuple])
+  (:import [backtype.storm.tuple MessageId])
   (:import [backtype.storm.spout ISpoutWaitStrategy])
   (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
             EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
@@ -389,12 +390,25 @@
   (let [^KryoTupleDeserializer deserializer (:deserializer executor-data)
         task-ids (:task-ids executor-data)
         debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
+        component-id (:component-id executor-data)
+        executor-id (:executor-id executor-data)
+        executor-type (:type executor-data)
         ]
     (disruptor/clojure-handler
       (fn [tuple-batch sequence-id end-of-batch?]
         (fast-list-iter [[task-id msg] tuple-batch]
-          (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))]
-            (when debug? (log-message "Processing received message " tuple))
+          (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))
+                tuple-streamid (.getSourceStreamId tuple)
+                tuple-source (.getSourceComponent tuple)
+                tuple-id (.getMessageId tuple)
+                tuple-values (.getValues tuple)
+                ]
+            (when debug? 
+              (if (= tuple-streamid "default")
+                (log-message "Component[" component-id "] Type[RECV] from Stream[" tuple-streamid "] Source[" tuple-source "] TupleId[" tuple-id "] TupleValue[" tuple-values "]")
+                (log-message "Component[" component-id "] Type[RECV] from Stream[" tuple-streamid "] Source[" tuple-source "] TupleId[" tuple-values "]")
+                )
+              )
             (if task-id
               (tuple-action-fn task-id tuple)
               ;; null task ids are broadcast tuples
@@ -421,6 +435,7 @@
         last-active (atom false)        
         spouts (ArrayList. (map :object (vals task-datas)))
         rand (Random. (Utils/secureRandomLong))
+        debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
         
         pending (RotatingMap.
                  2 ;; microoptimize for performance of .size method
@@ -428,9 +443,12 @@
                    (expire [this msg-id [task-id spout-id tuple-info start-time-ms]]
                      (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
                        (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta)
+                       (when debug? 
+                         (log-message "Component[" component-id "] FAILED-TUPLE reason[EXPIRED] TupleId[" msg-id "] values[" tuple-info "]"))
                        ))))
         tuple-action-fn (fn [task-id ^TupleImpl tuple]
-                          (let [stream-id (.getSourceStreamId tuple)]
+                          (let [stream-id (.getSourceStreamId tuple)
+                                tuple-id (.getMessageId tuple)]
                             (condp = stream-id
                               Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
                               Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
@@ -441,10 +459,18 @@
                                     (throw-runtime "Fatal error, mismatched task ids: " task-id " " stored-task-id))
                                   (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
                                     (condp = stream-id
-                                      ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
-                                                                         spout-id tuple-finished-info time-delta)
-                                      ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
-                                                                           spout-id tuple-finished-info time-delta)
+                                      ACKER-ACK-STREAM-ID (do 
+                                                            (ack-spout-msg executor-data (get task-datas task-id)
+                                                                            spout-id tuple-finished-info time-delta)
+                                                            (when debug?
+                                                              (log-message "Component[" component-id "] ACK-TUPLE reason[RECV] TupleId[" id "] values[" tuple-finished-info "]"))
+                                                            )
+                                      ACKER-FAIL-STREAM-ID (do 
+                                                             (fail-spout-msg executor-data (get task-datas task-id)
+                                                                              spout-id tuple-finished-info time-delta)
+                                                             (when debug?
+                                                              (log-message "Component[" component-id "] FAILED-TUPLE reason[RECV] TupleId[" id "] values[" tuple-finished-info "]"))
+                                                             )
                                       )))
                                 ;; TODO: on failure, emit tuple to failure stream
                                 ))))
@@ -493,6 +519,8 @@
                                                            (transfer-fn out-task
                                                                         out-tuple
                                                                         overflow-buffer)
+                                                           (when debug? 
+                                                             (log-message "Component[" component-id "] Type[EMIT] to Stream[" out-stream-id "] TupleId[" tuple-id "] values[" values "]"))
                                                            ))
                                          (if rooted?
                                            (do
@@ -598,6 +626,8 @@
         {:keys [storm-conf component-id worker-context transfer-fn report-error sampler
                 open-or-prepare-was-called?]} executor-data
         rand (Random. (Utils/secureRandomLong))
+        debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
+
         tuple-action-fn (fn [task-id ^TupleImpl tuple]
                           ;; synchronization needs to be done with a key provided by this bolt, otherwise:
                           ;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
@@ -660,7 +690,13 @@
                                                     (tasks-fn task stream values)
                                                     (tasks-fn stream values))]
                                     (fast-list-iter [t out-tasks]
-                                                    (let [anchors-to-ids (HashMap.)]
+                                                    (let [anchors-to-ids (HashMap.)
+                                                          out-tuple (TupleImpl. worker-context
+                                                                               values
+                                                                               task-id
+                                                                               stream
+                                                                               (MessageId/makeId anchors-to-ids))
+                                                          ]
                                                       (fast-list-iter [^TupleImpl a anchors]
                                                                       (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
                                                                         (when (pos? (count root-ids))
@@ -669,12 +705,15 @@
                                                                             (fast-list-iter [root-id root-ids]
                                                                                             (put-xor! anchors-to-ids root-id edge-id))
                                                                             ))))
-                                                      (transfer-fn t
-                                                                   (TupleImpl. worker-context
-                                                                               values
-                                                                               task-id
-                                                                               stream
-                                                                               (MessageId/makeId anchors-to-ids)))))
+                                                      (transfer-fn t out-tuple)
+                                                      (when debug? 
+                                                        (if (= component-id "__acker")
+                                                          (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" (.get values 0) "]")
+                                                          (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" (.getMessageId out-tuple) "] values[" values "]")
+                                                          )
+                                                        
+                                                        )
+                                                      ))
                                     (or out-tasks [])))]]
           (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
           (if (= component-id Constants/SYSTEM_COMPONENT_ID)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4d6a27f4/storm-core/src/clj/backtype/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/task.clj b/storm-core/src/clj/backtype/storm/daemon/task.clj
index 3650150..29756a1 100644
--- a/storm-core/src/clj/backtype/storm/daemon/task.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/task.clj
@@ -131,7 +131,7 @@
         
     (fn ([^Integer out-task-id ^String stream ^List values]
           (when debug?
-            (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
+            (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" values "]"))
           (let [target-component (.getComponentId worker-context out-task-id)
                 component->grouping (get stream->component->grouper stream)
                 grouping (get component->grouping target-component)
@@ -149,7 +149,7 @@
             ))
         ([^String stream ^List values]
            (when debug?
-             (log-message "Emitting: " component-id " " stream " " values))
+             (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" values "]"))
            (let [out-tasks (ArrayList.)]
              (fast-map-iter [[out-component grouper] (get stream->component->grouper stream)]
                (when (= :direct grouper)


[20/24] git commit: change metrics_test use simulated-time-local-cluster and modify the hang problem with assert-buckets

Posted by bo...@apache.org.
change metrics_test use simulated-time-local-cluster and modify the hang problem with assert-buckets


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/4bab2b85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/4bab2b85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/4bab2b85

Branch: refs/heads/master
Commit: 4bab2b857d0d481d1859336adeb87d7245c6e1c3
Parents: ecd0adb
Author: dashengju <da...@qq.com>
Authored: Fri Jun 20 10:34:38 2014 +0800
Committer: dashengju <da...@qq.com>
Committed: Fri Jun 20 10:34:38 2014 +0800

----------------------------------------------------------------------
 .../src/dev/resources/tester_spout_metrics.py   |  12 +-
 .../test/clj/backtype/storm/metrics_test.clj    | 156 ++++++++++---------
 2 files changed, 84 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4bab2b85/storm-core/src/dev/resources/tester_spout_metrics.py
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/tester_spout_metrics.py b/storm-core/src/dev/resources/tester_spout_metrics.py
index a91128a..0480566 100644
--- a/storm-core/src/dev/resources/tester_spout_metrics.py
+++ b/storm-core/src/dev/resources/tester_spout_metrics.py
@@ -32,16 +32,14 @@ class TesterSpout(storm.Spout):
         self.count = 0
 
     def nextTuple(self):
-        sleep(1)
-        storm.log("TesterSpout emit a tuple")
-        word = choice(words)
-        id = str(uuid4())
-        self.pending[id] = word
         if self.count < 2:
+            word = choice(words)
+            id = str(uuid4())
+            self.pending[id] = word
             storm.rpcMetrics("my-custom-shellspout-metric", 1)
-            storm.log("TesterSpout update my-custom-shellspout-metric")
             self.count = self.count + 1
-        storm.emit([word], id=id)
+            storm.log("TesterSpout update my-custom-shellspout-metric "+str(self.count))
+            storm.emit([word], id=id)
 
     def ack(self, id):
         del self.pending[id]

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4bab2b85/storm-core/test/clj/backtype/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/metrics_test.clj b/storm-core/test/clj/backtype/storm/metrics_test.clj
index 4aaa3dc..4356d30 100644
--- a/storm-core/test/clj/backtype/storm/metrics_test.clj
+++ b/storm-core/test/clj/backtype/storm/metrics_test.clj
@@ -70,15 +70,17 @@
 
 (def metrics-data backtype.storm.metric.testing/buffer)
 
-(defn wait-for-atleast-N-buckets! [N comp-id metric-name]
-  (while
+(defn wait-for-atleast-N-buckets! [N comp-id metric-name cluster]
+  (while-timeout TEST-TIMEOUT-MS
       (let [taskid->buckets (-> @metrics-data (get comp-id) (get metric-name))]
         (or
          (and (not= N 0) (nil? taskid->buckets))
          (not-every? #(<= N %) (map (comp count second) taskid->buckets))))
-;;    (println "Waiting for at least" N "timebuckets to appear in FakeMetricsConsumer for component id" comp-id
-;;             "and metric name" metric-name)
-    (Thread/sleep 10)))
+      ;;(log-message "Waiting for at least " N " timebuckets to appear in FakeMetricsConsumer for component id " comp-id " and metric name " metric-name " metrics " (-> @metrics-data (get comp-id) (get metric-name)))
+    (if cluster
+      (advance-cluster-time cluster 1)
+      (Thread/sleep 10))))
+    
 
 (defn lookup-bucket-by-comp-id-&-metric-name! [comp-id metric-name]
   (-> @metrics-data
@@ -88,10 +90,10 @@
       (second)
       (or [])))
 
-(defmacro assert-buckets! [comp-id metric-name expected]
+(defmacro assert-buckets! [comp-id metric-name expected cluster]
   `(do
      (let [N# (count ~expected)]
-       (wait-for-atleast-N-buckets! N# ~comp-id ~metric-name)
+       (wait-for-atleast-N-buckets! N# ~comp-id ~metric-name ~cluster)
        (is (= ~expected (subvec (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name) 0 N#))))))
 
 (defmacro assert-metric-data-exists! [comp-id metric-name]
@@ -112,18 +114,18 @@
 
       (.feed feeder ["a"] 1)
       (advance-cluster-time cluster 6)
-      (assert-buckets! "2" "my-custom-metric" [1])
+      (assert-buckets! "2" "my-custom-metric" [1] cluster)
             
       (advance-cluster-time cluster 5)
-      (assert-buckets! "2" "my-custom-metric" [1 0])
+      (assert-buckets! "2" "my-custom-metric" [1 0] cluster)
 
       (advance-cluster-time cluster 20)
-      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0])
+      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0] cluster)
       
       (.feed feeder ["b"] 2)
       (.feed feeder ["c"] 3)               
       (advance-cluster-time cluster 5)
-      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
+      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
 
 (deftest test-custom-metric-with-multi-tasks
   (with-simulated-time-local-cluster
@@ -140,18 +142,18 @@
 
       (.feed feeder ["a"] 1)
       (advance-cluster-time cluster 6)
-      (assert-buckets! "2" "my-custom-metric" [1])
+      (assert-buckets! "2" "my-custom-metric" [1] cluster)
 
       (advance-cluster-time cluster 5)
-      (assert-buckets! "2" "my-custom-metric" [1 0])
+      (assert-buckets! "2" "my-custom-metric" [1 0] cluster)
 
       (advance-cluster-time cluster 20)
-      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0])
+      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0] cluster)
 
       (.feed feeder ["b"] 2)
       (.feed feeder ["c"] 3)
       (advance-cluster-time cluster 5)
-      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
+      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
 
 (defn mk-shell-bolt-with-metrics-spec
   [inputs command & kwargs]
@@ -160,7 +162,7 @@
          (PythonShellMetricsBolt. command) kwargs)))
 
 (deftest test-custom-metric-with-multilang-py
-  (with-local-cluster 
+  (with-simulated-time-local-cluster 
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
                        [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
                        "storm.zookeeper.connection.timeout" 30000
@@ -173,19 +175,19 @@
       (submit-local-topology (:nimbus cluster) "shell-metrics-tester" {} topology)
 
       (.feed feeder ["a"] 1)
-      (Thread/sleep 6000)
-      (assert-buckets! "2" "my-custom-shell-metric" [1])
+      (advance-cluster-time cluster 6)
+      (assert-buckets! "2" "my-custom-shell-metric" [1] cluster)
             
-      (Thread/sleep 5000)
-      (assert-buckets! "2" "my-custom-shell-metric" [1 0])
+      (advance-cluster-time cluster 5)
+      (assert-buckets! "2" "my-custom-shell-metric" [1 0] cluster)
 
-      (Thread/sleep 20000)
-      (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0])
+      (advance-cluster-time cluster 20)
+      (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0] cluster)
       
       (.feed feeder ["b"] 2)
       (.feed feeder ["c"] 3)               
-      (Thread/sleep 5000)
-      (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0 2])
+      (advance-cluster-time cluster 5)
+      (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0 2] cluster)
       )))
 
 (defn mk-shell-spout-with-metrics-spec
@@ -194,7 +196,7 @@
     (apply thrift/mk-spout-spec (PythonShellMetricsSpout. command) kwargs)))
 
 (deftest test-custom-metric-with-spout-multilang-py
-  (with-local-cluster 
+  (with-simulated-time-local-cluster 
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
                        [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
                        "storm.zookeeper.connection.timeout" 30000
@@ -204,8 +206,8 @@
                      {"2" (thrift/mk-bolt-spec {"1" :all} count-acks)})]
       (submit-local-topology (:nimbus cluster) "shell-spout-metrics-tester" {} topology)
 
-      (Thread/sleep 7000)
-      (assert-buckets! "1" "my-custom-shellspout-metric" [2])
+      (advance-cluster-time cluster 7)
+      (assert-buckets! "1" "my-custom-shellspout-metric" [2] cluster)
       )))
 
 
@@ -223,27 +225,27 @@
       
       (.feed feeder ["a"] 1)
       (advance-cluster-time cluster 61)
-      (assert-buckets! "myspout" "__ack-count/default" [1])
-      (assert-buckets! "myspout" "__emit-count/default" [1])
-      (assert-buckets! "myspout" "__transfer-count/default" [1])            
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1])
+      (assert-buckets! "myspout" "__ack-count/default" [1] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [1] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [1] cluster)            
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
 
       (advance-cluster-time cluster 120)
-      (assert-buckets! "myspout" "__ack-count/default" [1 0 0])
-      (assert-buckets! "myspout" "__emit-count/default" [1 0 0])
-      (assert-buckets! "myspout" "__transfer-count/default" [1 0 0])
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0])
+      (assert-buckets! "myspout" "__ack-count/default" [1 0 0] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [1 0 0] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [1 0 0] cluster)
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0] cluster)
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0] cluster)
 
       (.feed feeder ["b"] 1)
       (.feed feeder ["c"] 1)
       (advance-cluster-time cluster 60)
-      (assert-buckets! "myspout" "__ack-count/default" [1 0 0 2])
-      (assert-buckets! "myspout" "__emit-count/default" [1 0 0 2])
-      (assert-buckets! "myspout" "__transfer-count/default" [1 0 0 2])      
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 2])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0 2]))))
+      (assert-buckets! "myspout" "__ack-count/default" [1 0 0 2] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [1 0 0 2] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [1 0 0 2] cluster)      
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 2] cluster)
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0 2] cluster))))
 
 
 (deftest test-builtin-metrics-2
@@ -266,36 +268,36 @@
       (.feed feeder ["a"] 1)
       (advance-cluster-time cluster 6)
       (assert-acked tracker 1)
-      (assert-buckets! "myspout" "__fail-count/default" [])
-      (assert-buckets! "myspout" "__ack-count/default" [1])
-      (assert-buckets! "myspout" "__emit-count/default" [1])
-      (assert-buckets! "myspout" "__transfer-count/default" [1])            
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1])     
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1])
+      (assert-buckets! "myspout" "__fail-count/default" [] cluster)
+      (assert-buckets! "myspout" "__ack-count/default" [1] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [1] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [1] cluster)            
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)     
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
 
       (.feed feeder ["b"] 2)      
       (advance-cluster-time cluster 5)
-      (assert-buckets! "myspout" "__fail-count/default" [])
-      (assert-buckets! "myspout" "__ack-count/default" [1 0])
-      (assert-buckets! "myspout" "__emit-count/default" [1 1])
-      (assert-buckets! "myspout" "__transfer-count/default" [1 1])                  
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1])
+      (assert-buckets! "myspout" "__fail-count/default" [] cluster)
+      (assert-buckets! "myspout" "__ack-count/default" [1 0] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [1 1] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [1 1] cluster)                  
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0] cluster)
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1] cluster)
 
       (advance-cluster-time cluster 15)      
-      (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0])
-      (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0])
-      (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0])
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0])
+      (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0] cluster)
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0] cluster)
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0] cluster)
       
       (.feed feeder ["c"] 3)            
       (advance-cluster-time cluster 15)      
-      (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0 1 0 0])
-      (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0 1 0 0])
-      (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0 1 0 0])
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0 1 0 0])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0 1 0 0]))))
+      (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0 1 0 0] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0 1 0 0] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0 1 0 0] cluster)
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0 1 0 0] cluster)
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0 1 0 0] cluster))))
 
 (deftest test-builtin-metrics-3
   (with-simulated-time-local-cluster
@@ -319,21 +321,21 @@
       (.feed feeder ["c"] 3)
       (advance-cluster-time cluster 9)
       (assert-acked tracker 1 3)
-      (assert-buckets! "myspout" "__ack-count/default" [2])
-      (assert-buckets! "myspout" "__emit-count/default" [3])
-      (assert-buckets! "myspout" "__transfer-count/default" [3])
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [2])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [3])
+      (assert-buckets! "myspout" "__ack-count/default" [2] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [3] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [3] cluster)
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [2] cluster)
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [3] cluster)
       
       (is (not (.isFailed tracker 2)))
       (advance-cluster-time cluster 30)
       (assert-failed tracker 2)
-      (assert-buckets! "myspout" "__fail-count/default" [1])
-      (assert-buckets! "myspout" "__ack-count/default" [2 0 0 0])
-      (assert-buckets! "myspout" "__emit-count/default" [3 0 0 0])
-      (assert-buckets! "myspout" "__transfer-count/default" [3 0 0 0])
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [2 0 0 0])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [3 0 0 0]))))
+      (assert-buckets! "myspout" "__fail-count/default" [1] cluster)
+      (assert-buckets! "myspout" "__ack-count/default" [2 0 0 0] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [3 0 0 0] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [3 0 0 0] cluster)
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [2 0 0 0] cluster)
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [3 0 0 0] cluster))))
 
 (deftest test-system-bolt
   (with-simulated-time-local-cluster
@@ -348,12 +350,12 @@
 
       (.feed feeder ["a"] 1)
       (advance-cluster-time cluster 70)
-      (assert-buckets! "__system" "newWorkerEvent" [1])
+      (assert-buckets! "__system" "newWorkerEvent" [1] cluster)
       (assert-metric-data-exists! "__system" "uptimeSecs")
       (assert-metric-data-exists! "__system" "startTimeSecs")
 
       (advance-cluster-time cluster 180)
-      (assert-buckets! "__system" "newWorkerEvent" [1 0 0 0])
+      (assert-buckets! "__system" "newWorkerEvent" [1 0 0 0] cluster)
       )))
 
 


[06/24] git commit: remove TopologyContext.java unused import

Posted by bo...@apache.org.
remove TopologyContext.java unused import


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/e2a7f7c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/e2a7f7c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/e2a7f7c0

Branch: refs/heads/master
Commit: e2a7f7c0e1a5a413840948342f4be86d9ba56516
Parents: db709fe
Author: dashengju <da...@qq.com>
Authored: Fri Mar 7 09:48:58 2014 +0800
Committer: dashengju <da...@qq.com>
Committed: Fri Mar 7 09:48:58 2014 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/task/TopologyContext.java | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e2a7f7c0/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index c0fcf20..dfbad0c 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -26,8 +26,6 @@ import backtype.storm.metric.api.IReducer;
 import backtype.storm.metric.api.ICombiner;
 import backtype.storm.metric.api.ReducedMetric;
 import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.rpc.IShellMetric;
-import backtype.storm.spout.ShellSpout;
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;


[12/24] git commit: remove ui topology action

Posted by bo...@apache.org.
remove ui topology action


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/0427e06e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/0427e06e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/0427e06e

Branch: refs/heads/master
Commit: 0427e06e52776b1f4e4ad299d198ee7a1384bae6
Parents: b3191fc
Author: JuDasheng <ju...@meituan.com>
Authored: Wed Jun 11 23:00:02 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Wed Jun 11 23:00:02 2014 +0800

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/ui/core.clj | 74 +++++++++++-----------
 storm-core/src/ui/public/topology.html        |  6 +-
 storm-dist/binary/pom.xml                     |  2 +-
 3 files changed, 42 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0427e06e/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 5f2bcba..4098038 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -780,43 +780,43 @@
        (let [id (url-decode id)
              component (url-decode component)]
          (json-response (component-page id component (:window m) (check-include-sys? (:sys m))))))
-  (POST "/api/v1/topology/:id/activate" [id]
-    (with-nimbus nimbus
-      (let [id (url-decode id)
-            tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
-            name (.get_name tplg)]
-        (.activate nimbus name)
-        (log-message "Activating topology '" name "'")))
-    (resp/redirect (str "/api/v1/topology/" id)))
-
-  (POST "/api/v1/topology/:id/deactivate" [id]
-    (with-nimbus nimbus
-      (let [id (url-decode id)
-            tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
-            name (.get_name tplg)]
-        (.deactivate nimbus name)
-        (log-message "Deactivating topology '" name "'")))
-    (resp/redirect (str "/api/v1/topology/" id)))
-  (POST "/api/v1/topology/:id/rebalance/:wait-time" [id wait-time]
-    (with-nimbus nimbus
-      (let [id (url-decode id)
-            tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
-            name (.get_name tplg)
-            options (RebalanceOptions.)]
-        (.set_wait_secs options (Integer/parseInt wait-time))
-        (.rebalance nimbus name options)
-        (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
-    (resp/redirect (str "/api/v1/topology/" id)))
-  (POST "/api/v1/topology/:id/kill/:wait-time" [id wait-time]
-    (with-nimbus nimbus
-      (let [id (url-decode id)
-            tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
-            name (.get_name tplg)
-            options (KillOptions.)]
-        (.set_wait_secs options (Integer/parseInt wait-time))
-        (.killTopologyWithOpts nimbus name options)
-        (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
-    (resp/redirect (str "/api/v1/topology/" id)))
+  ;(POST "/api/v1/topology/:id/activate" [id]
+  ;  (with-nimbus nimbus
+  ;    (let [id (url-decode id)
+  ;          tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+  ;          name (.get_name tplg)]
+  ;      (.activate nimbus name)
+  ;      (log-message "Activating topology '" name "'")))
+  ;  (resp/redirect (str "/api/v1/topology/" id)))
+
+  ;(POST "/api/v1/topology/:id/deactivate" [id]
+  ;  (with-nimbus nimbus
+  ;    (let [id (url-decode id)
+  ;          tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+  ;          name (.get_name tplg)]
+  ;      (.deactivate nimbus name)
+  ;      (log-message "Deactivating topology '" name "'")))
+  ;  (resp/redirect (str "/api/v1/topology/" id)))
+  ;(POST "/api/v1/topology/:id/rebalance/:wait-time" [id wait-time]
+  ;  (with-nimbus nimbus
+  ;    (let [id (url-decode id)
+  ;          tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+  ;          name (.get_name tplg)
+  ;          options (RebalanceOptions.)]
+  ;      (.set_wait_secs options (Integer/parseInt wait-time))
+  ;      (.rebalance nimbus name options)
+  ;      (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
+  ;  (resp/redirect (str "/api/v1/topology/" id)))
+  ;(POST "/api/v1/topology/:id/kill/:wait-time" [id wait-time]
+  ;  (with-nimbus nimbus
+  ;    (let [id (url-decode id)
+  ;          tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+  ;          name (.get_name tplg)
+  ;          options (KillOptions.)]
+  ;      (.set_wait_secs options (Integer/parseInt wait-time))
+  ;      (.killTopologyWithOpts nimbus name options)
+  ;      (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
+  ;  (resp/redirect (str "/api/v1/topology/" id)))
 
   (GET "/" [:as {cookies :cookies}]
        (resp/redirect "/index.html"))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0427e06e/storm-core/src/ui/public/topology.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html
index df095ad..9cb08cd 100644
--- a/storm-core/src/ui/public/topology.html
+++ b/storm-core/src/ui/public/topology.html
@@ -19,11 +19,13 @@
 <h2>Topology summary</h2>
 <div id="topology-summary">
 </div>
+<!--
 <div id="topology-actions">
 <h2 class="js-only">Topology actions</h2>
 <p id="topology-actions" class="js-only">
 </p>
 </div>
+-->
 <div id="topology-stats"></div>
 <div id="spout-stats">
 </div>
@@ -60,13 +62,13 @@ $(document).ready(function() {
         var spoutStats = $("#spout-stats");
         var boltStats = $("#bolt-stats");
         var config = $("#topology-configuration");
-        var topologyActions = $("#topology-actions");
+        //var topologyActions = $("#topology-actions");
         var topologyVisualization = $("#topology-visualization")
         var formattedConfig = formatConfigData(response["configuration"]);
         var buttonJsonData = topologyActionJson(response["id"],response["name"],response["status"],response["msgTimeout"]);
         $.get("/templates/topology-page-template.html", function(template) {
             topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response));
-            topologyActions.append(Mustache.render($(template).filter("#topology-actions-template").html(),buttonJsonData));
+            //topologyActions.append(Mustache.render($(template).filter("#topology-actions-template").html(),buttonJsonData));
             topologyStats.append(Mustache.render($(template).filter("#topology-stats-template").html(),response));
             $("#topology-stats-table").tablesorter({ sortList: [[0,0]], headers: {0: { sorter: "stormtimestr"}}});
             spoutStats.append(Mustache.render($(template).filter("#spout-stats-template").html(),response));

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0427e06e/storm-dist/binary/pom.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/pom.xml b/storm-dist/binary/pom.xml
index 4fa3a27..bb1b876 100644
--- a/storm-dist/binary/pom.xml
+++ b/storm-dist/binary/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.2-incubating-SNAPSHOT</version>
+        <version>0.9.2-incubating-mt0000</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
     <groupId>org.apache.storm</groupId>


[04/24] git commit: add AssignableShellMetric.java to rpc

Posted by bo...@apache.org.
add AssignableShellMetric.java to rpc


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/98a27203
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/98a27203
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/98a27203

Branch: refs/heads/master
Commit: 98a27203169abf2d7a2bcad163448c488d8d0d40
Parents: 5448955
Author: JuDasheng <ju...@meituan.com>
Authored: Thu Mar 6 17:04:57 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Thu Mar 6 17:04:57 2014 +0800

----------------------------------------------------------------------
 .../metric/api/rpc/AssignableShellMetric.java      | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/98a27203/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
index e0722b8..20387ed 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package backtype.storm.metric.api.rpc;
 
 import backtype.storm.metric.api.AssignableMetric;


[18/24] git commit: Merge branch 'master' of github.com:dashengju/incubator-storm

Posted by bo...@apache.org.
Merge branch 'master' of github.com:dashengju/incubator-storm


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/e33023c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/e33023c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/e33023c7

Branch: refs/heads/master
Commit: e33023c7e7f5ec10cbc2d5eade7d68102bfbb317
Parents: 62f854a ecd0adb
Author: JuDasheng <ju...@meituan.com>
Authored: Wed Jun 18 14:29:31 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Wed Jun 18 14:29:31 2014 +0800

----------------------------------------------------------------------
 .../src/dev/resources/tester_bolt_metrics.py    | 35 +++++++++
 .../src/dev/resources/tester_spout_metrics.py   | 53 ++++++++++++++
 .../storm/testing/PythonShellMetricsBolt.java   | 32 +++++++++
 .../storm/testing/PythonShellMetricsSpout.java  | 35 +++++++++
 .../test/clj/backtype/storm/metrics_test.clj    | 74 ++++++++++++++++++--
 5 files changed, 225 insertions(+), 4 deletions(-)
----------------------------------------------------------------------



[10/24] git commit: support metrics and cglimit

Posted by bo...@apache.org.
support metrics and cglimit


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/fbaf7278
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/fbaf7278
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/fbaf7278

Branch: refs/heads/master
Commit: fbaf7278940925f27e24f436d1917aee28b23d3c
Parents: 0279d72
Author: JuDasheng <ju...@meituan.com>
Authored: Wed Jun 11 16:35:38 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Wed Jun 11 16:35:38 2014 +0800

----------------------------------------------------------------------
 .../clj/backtype/storm/daemon/supervisor.clj    |  4 ++-
 .../storm/multilang/JsonSerializer.java         | 10 +++++++
 .../jvm/backtype/storm/multilang/ShellMsg.java  | 20 ++++++++++++++
 .../jvm/backtype/storm/spout/ShellSpout.java    | 12 +++-----
 .../src/jvm/backtype/storm/task/ShellBolt.java  | 29 +++++---------------
 storm-core/src/multilang/py/storm.py            |  2 +-
 6 files changed, 45 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fbaf7278/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 556653a..53b2802 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -473,8 +473,10 @@
           topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)]
                                   (substitute-worker-childopts s port))
           logfilename (str "worker-" port ".log")
+          worker-childcgroup (when-let [s (conf WORKER-CHILDCGROUP)] 
+                                (.split s " "))
           command (concat
-                    (conf WORKER-CHILDCGROUP)
+                    worker-childcgroup
 		    [(java-cmd) "-server"]
                     worker-childopts
                     topo-worker-childopts

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fbaf7278/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java b/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
index dd5773f..48405e0 100644
--- a/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
+++ b/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
@@ -118,6 +118,16 @@ public class JsonSerializer implements ISerializer {
                 shellMsg.addAnchor((String) o);
             }
         }
+       
+        Object nameObj = msg.get("name"); 
+        String metricName = null;
+        if (nameObj != null && nameObj instanceof String) {
+            metricName = (String) nameObj;
+        }
+        shellMsg.setMetricName(metricName);
+        
+        Object paramsObj = msg.get("params");
+        shellMsg.setMetricParams(paramsObj); 
 
         return shellMsg;
     }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fbaf7278/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java b/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
index 1747f5b..ef87536 100644
--- a/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
+++ b/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
@@ -25,6 +25,10 @@ public class ShellMsg {
     private List<Object> tuple;
     private boolean needTaskIds;
 
+    //metrics rpc 
+    private String metricName;
+    private Object metricParams;
+
     public String getCommand() {
         return command;
     }
@@ -102,4 +106,20 @@ public class ShellMsg {
     public void setNeedTaskIds(boolean needTaskIds) {
         this.needTaskIds = needTaskIds;
     }
+
+    public void setMetricName(String metricName) {
+        this.metricName = metricName;
+    }
+
+    public String getMetricName() {
+        return this.metricName;
+    }
+
+    public void setMetricParams(Object metricParams) {
+        this.metricParams = metricParams;
+    }
+
+    public Object getMetricParams() {
+        return metricParams;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fbaf7278/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index 368f16a..9f4b38e 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -93,13 +93,9 @@ public class ShellSpout implements ISpout {
         querySubprocess();
     }
     
-    private void handleMetrics(Map action) {
+    private void handleMetrics(ShellMsg shellMsg) {
         //get metric name
-        Object nameObj = action.get("name");
-        if (nameObj == null || !(nameObj instanceof String) ) {
-            throw new RuntimeException("Receive Metrics name is null or is not String");
-        }
-        String name = (String) nameObj;
+        String name = shellMsg.getMetricName();
         if (name.isEmpty()) {
             throw new RuntimeException("Receive Metrics name is empty");
         }
@@ -115,7 +111,7 @@ public class ShellSpout implements ISpout {
         IShellMetric iShellMetric = (IShellMetric)iMetric;
         
         //call updateMetricFromRPC with params
-        Object paramsObj = action.get("params");
+        Object paramsObj = shellMsg.getMetricParams();
         try {
             iShellMetric.updateMetricFromRPC(paramsObj);
         } catch (RuntimeException re) {
@@ -151,7 +147,7 @@ public class ShellSpout implements ISpout {
                         _collector.emitDirect((int) task.longValue(), stream, tuple, messageId);
                     }
                 } else if (command.equals("metrics")) {
-                    handleMetrics(action);
+                    handleMetrics(shellMsg);
                 } else {
                     throw new RuntimeException("Unknown command received: " + command);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fbaf7278/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 50d6e20..b02a7fd 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -98,11 +98,10 @@ public class ShellBolt implements IBolt {
         }
         _rand = new Random();
         _collector = collector;
-<<<<<<< HEAD
+
         _context = context;
-=======
+
         _process = new ShellProcess(_command);
->>>>>>> upstream/master
 
         //subprocesses must send their pid first thing
         Number subpid = _process.launch(stormConf, context);
@@ -126,13 +125,9 @@ public class ShellBolt implements IBolt {
                             String msg = shellMsg.getMsg();
                             LOG.info("Shell msg: " + msg);
                         } else if (command.equals("emit")) {
-<<<<<<< HEAD
-                            handleEmit(action);
-                        } else if (command.equals("metrics")) {
-                            handleMetrics(action);
-=======
                             handleEmit(shellMsg);
->>>>>>> upstream/master
+                        } else if (command.equals("metrics")) {
+                            handleMetrics(shellMsg);
                         }
                     } catch (InterruptedException e) {
                     } catch (Throwable t) {
@@ -194,14 +189,8 @@ public class ShellBolt implements IBolt {
         _process.destroy();
         _inputs.clear();
     }
-<<<<<<< HEAD
-    
-    private void handleAck(Map action) {
-        String id = (String) action.get("id");
-=======
 
     private void handleAck(Object id) {
->>>>>>> upstream/master
         Tuple acked = _inputs.remove(id);
         if(acked==null) {
             throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id);
@@ -245,13 +234,9 @@ public class ShellBolt implements IBolt {
         }
     }
     
-    private void handleMetrics(Map action) {
+    private void handleMetrics(ShellMsg shellMsg) {
         //get metric name
-        Object nameObj = action.get("name");
-        if (nameObj == null || !(nameObj instanceof String) ) {
-            throw new RuntimeException("Receive Metrics name is null or is not String");
-        }
-        String name = (String) nameObj;
+        String name = shellMsg.getMetricName();
         if (name.isEmpty()) {
             throw new RuntimeException("Receive Metrics name is empty");
         }
@@ -267,7 +252,7 @@ public class ShellBolt implements IBolt {
         IShellMetric iShellMetric = (IShellMetric)iMetric;
         
         //call updateMetricFromRPC with params
-        Object paramsObj = action.get("params");
+        Object paramsObj = shellMsg.getMetricParams();
         try {
             iShellMetric.updateMetricFromRPC(paramsObj);
         } catch (RuntimeException re) {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fbaf7278/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
index 5de3c0d..a4c8c2c 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -33,7 +33,7 @@ json_decode = lambda x: json.loads(x)
 def readMsg():
     msg = ""
     while True:
-        line = sys.stdin.readline()[0:-1]
+        line = sys.stdin.readline()
 	if not line:
 	    raise Exception('Read EOF from stdin')
         if line[0:-1] == "end":


[02/24] git commit: modify java refelction

Posted by bo...@apache.org.
modify java refelction


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ecb39a70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ecb39a70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ecb39a70

Branch: refs/heads/master
Commit: ecb39a70f157a5d08a0fb90d0d3319b1c9a71ceb
Parents: 7a21ae0
Author: JuDasheng <ju...@meituan.com>
Authored: Thu Mar 6 17:01:50 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Thu Mar 6 17:01:50 2014 +0800

----------------------------------------------------------------------
 .../metric/api/rpc/CombinedShellMetric.java     | 14 ++--
 .../storm/metric/api/rpc/CountShellMetric.java  | 18 ++---
 .../storm/metric/api/rpc/IShellMetric.java      |  8 +-
 .../metric/api/rpc/ReducedShellMetric.java      | 12 +--
 .../jvm/backtype/storm/spout/ShellSpout.java    | 84 ++++++++------------
 .../src/jvm/backtype/storm/task/ShellBolt.java  | 84 ++++++++------------
 .../backtype/storm/task/TopologyContext.java    | 38 +++++----
 7 files changed, 113 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
index fd940a7..231c571 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
@@ -21,13 +21,11 @@ import backtype.storm.metric.api.CombinedMetric;
 import backtype.storm.metric.api.ICombiner;
 
 public class CombinedShellMetric extends CombinedMetric implements IShellMetric {
+    public CombinedShellMetric(ICombiner combiner) {
+        super(combiner);
+    }
 
-	public CombinedShellMetric(ICombiner combiner) {
-		super(combiner);
-	}
-
-	public void updateMetricFromRPC(Object value) {
-		update(value);
-	}
-
+    public void updateMetricFromRPC(Object value) {
+        update(value);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
index 1779223..def74c2 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
@@ -26,13 +26,13 @@ public class CountShellMetric extends CountMetric implements IShellMetric {
      *  if value is null, it will call incr()
      *  if value is long, it will call incrBy((long)params)
      * */
-	public void updateMetricFromRPC(Object value) {
-		if (value == null) {
-			incr();
-		} else if (value instanceof Long) {
-			incrBy((Long)value);
-		} else {
-			throw new RuntimeException("CountShellMetric updateMetricFromRPC params should be null or Long");
-		}
-	}
+    public void updateMetricFromRPC(Object value) {
+        if (value == null) {
+            incr();
+        } else if (value instanceof Long) {
+            incrBy((Long)value);
+        } else {
+            throw new RuntimeException("CountShellMetric updateMetricFromRPC params should be null or Long");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
index 9bec3a1..d53baea 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
@@ -20,14 +20,12 @@ package backtype.storm.metric.api.rpc;
 import backtype.storm.metric.api.IMetric;
 
 public interface IShellMetric extends IMetric {
-	public static final String SHELL_METRICS_UPDATE_METHOD_NAME = "updateMetricFromRPC";
-	
     /***
      * @function
-     * 	This interface is used by ShellBolt and ShellSpout through RPC call to update Metric 
+     *     This interface is used by ShellBolt and ShellSpout through RPC call to update Metric 
      * @param
-     * 	value used to update metric, its's meaning change according implementation
-     *  Object can be any json support types: String, Long, Double, Boolean, Null, List, Map
+     *     value used to update metric, its's meaning change according implementation
+     *     Object can be any json support types: String, Long, Double, Boolean, Null, List, Map
      * */
     public void updateMetricFromRPC(Object value);
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
index 727a709..097ed51 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
@@ -22,11 +22,11 @@ import backtype.storm.metric.api.ReducedMetric;
 
 public class ReducedShellMetric extends ReducedMetric implements IShellMetric {
 
-	public ReducedShellMetric(IReducer reducer) {
-		super(reducer);
-	}
+    public ReducedShellMetric(IReducer reducer) {
+        super(reducer);
+    }
 
-	public void updateMetricFromRPC(Object value) {
-		update(value);
-	}
+    public void updateMetricFromRPC(Object value) {
+        update(value);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index e0abe8a..c6443e4 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -18,6 +18,7 @@
 package backtype.storm.spout;
 
 import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.IMetric;
 import backtype.storm.metric.api.rpc.IShellMetric;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.utils.ShellProcess;
@@ -41,8 +42,8 @@ public class ShellSpout implements ISpout {
     private String[] _command;
     private ShellProcess _process;
     
-    private Map<String, IShellMetric> _registeredShellMetrics = new ConcurrentHashMap<String, IShellMetric>();
-
+    private TopologyContext _context;
+    
     public ShellSpout(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
     }
@@ -55,6 +56,7 @@ public class ShellSpout implements ISpout {
                      SpoutOutputCollector collector) {
         _process = new ShellProcess(_command);
         _collector = collector;
+        _context = context;
 
         try {
             Number subpid = _process.launch(stormConf, context);
@@ -101,46 +103,35 @@ public class ShellSpout implements ISpout {
     }
     
     private void handleMetrics(Map action) {
-    	//get metrics
-    	Object nameObj = action.get("name");
-    	if ( !(nameObj instanceof String) ) {
-    		throw new RuntimeException("Receive Metrics name is not String");
-    	}
-    	String name = (String) nameObj;
-    	if (name == null || name.isEmpty()) {
-    		throw new RuntimeException("Receive Metrics name is NULL");
-    	}
-    	if ( !_registeredShellMetrics.containsKey(name)) {
-    		throw new RuntimeException("Receive Metrics name:" + name + " does not reigster.");
-    	}
-    	IShellMetric iMetric = _registeredShellMetrics.get(name);
-    	
-    	//get paramList
-    	Object paramsObj = action.get("params");
+        //get metric name
+        Object nameObj = action.get("name");
+        if (nameObj == null || !(nameObj instanceof String) ) {
+            throw new RuntimeException("Receive Metrics name is null or is not String");
+        }
+        String name = (String) nameObj;
+        if (name.isEmpty()) {
+            throw new RuntimeException("Receive Metrics name is empty");
+        }
         
-    	Class<? extends IShellMetric> oriClass = iMetric.getClass();
-    	Method method = null;
-		try {
-			method = oriClass.getMethod(IShellMetric.SHELL_METRICS_UPDATE_METHOD_NAME, new Class[]{Object.class});
-		} catch (SecurityException e) {
-			LOG.error("handleMetrics get method ["+name+"] SecurityException");
-			throw new RuntimeException(e);
-		} catch (NoSuchMethodException e) {
-			LOG.error("handleMetrics get method ["+name+"] NoSuchMethodException");
-			throw new RuntimeException(e);
-		}
-    	try {
-			method.invoke(iMetric, paramsObj);
-		} catch (IllegalArgumentException e) {
-			LOG.error("handleMetrics invoke["+name+"] IllegalArgumentException");
-			throw new RuntimeException(e);
-		} catch (IllegalAccessException e) {
-			LOG.error("handleMetrics invoke["+name+"] IllegalAccessException");
-			throw new RuntimeException(e);
-		} catch (InvocationTargetException e) {
-			LOG.error("handleMetrics invoke["+name+"] InvocationTargetException");
-			throw new RuntimeException(e);
-		}
+        //get metric by name
+        IMetric iMetric = _context.getRegisteredMetricByName(name);
+        if (iMetric == null) {
+            throw new RuntimeException("Not find metric by name["+name+"] ");
+        }
+        if ( !(iMetric instanceof IShellMetric)) {
+            throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+        }
+        IShellMetric iShellMetric = (IShellMetric)iMetric;
+        
+        //call updateMetricFromRPC with params
+        Object paramsObj = action.get("params");
+        try {
+            iShellMetric.updateMetricFromRPC(paramsObj);
+        } catch (RuntimeException re) {
+            throw re;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }       
     }
 
     private void querySubprocess(Object query) {
@@ -171,22 +162,13 @@ public class ShellSpout implements ISpout {
                         _collector.emitDirect((int)task.longValue(), stream, tuple, messageId);
                     }
                 } else if (command.equals("metrics")) {
-                	handleMetrics(action);
+                    handleMetrics(action);
                 }
             }
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
     }
-    
-    public <T extends IShellMetric> T registerMetric(String name, T metric) {
-    	if ( _registeredShellMetrics.containsKey(name) ) {
-    		throw new RuntimeException("The same metric name `" + name + "` was registered in ShellSpout twice." );
-    	} else {
-    		_registeredShellMetrics.put(name, metric);
-    	}
-    	return metric;
-    }
 
     @Override
     public void activate() {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index e6cbe5e..b2bdc22 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -19,6 +19,7 @@ package backtype.storm.task;
 
 import backtype.storm.Config;
 import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.IMetric;
 import backtype.storm.metric.api.rpc.IShellMetric;
 import backtype.storm.tuple.MessageId;
 import backtype.storm.tuple.Tuple;
@@ -81,9 +82,9 @@ public class ShellBolt implements IBolt {
     
     private Thread _readerThread;
     private Thread _writerThread;
-
-    private Map<String, IShellMetric> _registeredShellMetrics = new ConcurrentHashMap<String, IShellMetric>();
     
+    private TopologyContext _context;
+
     public ShellBolt(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
     }
@@ -101,6 +102,7 @@ public class ShellBolt implements IBolt {
         _rand = new Random();
         _process = new ShellProcess(_command);
         _collector = collector;
+        _context = context;
 
         try {
             //subprocesses must send their pid first thing
@@ -133,7 +135,7 @@ public class ShellBolt implements IBolt {
                         } else if (command.equals("emit")) {
                             handleEmit(action);
                         } else if (command.equals("metrics")) {
-                        	handleMetrics(action);
+                            handleMetrics(action);
                         }
                     } catch (InterruptedException e) {
                     } catch (Throwable t) {
@@ -192,15 +194,6 @@ public class ShellBolt implements IBolt {
         _process.destroy();
         _inputs.clear();
     }
-
-    public <T extends IShellMetric> T registerMetric(String name, T metric) {
-    	if ( _registeredShellMetrics.containsKey(name) ) {
-    		throw new RuntimeException("The same metric name `" + name + "` was registered in ShellBolt twice." );
-    	} else {
-    		_registeredShellMetrics.put(name, metric);
-    	}
-    	return metric;
-    }
     
     private void handleAck(Map action) {
         String id = (String) action.get("id");
@@ -256,46 +249,35 @@ public class ShellBolt implements IBolt {
     }
     
     private void handleMetrics(Map action) {
-    	//get metrics
-    	Object nameObj = action.get("name");
-    	if ( !(nameObj instanceof String) ) {
-    		throw new RuntimeException("Receive Metrics name is not String");
-    	}
-    	String name = (String) nameObj;
-    	if (name == null || name.isEmpty()) {
-    		throw new RuntimeException("Receive Metrics name is NULL");
-    	}
-    	if ( !_registeredShellMetrics.containsKey(name)) {
-    		throw new RuntimeException("Receive Metrics name:" + name + " does not reigster.");
-    	}
-    	IShellMetric iMetric = _registeredShellMetrics.get(name);
-    	
-    	//get paramList
-    	Object paramsObj = action.get("params");
+        //get metric name
+        Object nameObj = action.get("name");
+        if (nameObj == null || !(nameObj instanceof String) ) {
+            throw new RuntimeException("Receive Metrics name is null or is not String");
+        }
+        String name = (String) nameObj;
+        if (name.isEmpty()) {
+            throw new RuntimeException("Receive Metrics name is empty");
+        }
         
-    	Class<? extends IShellMetric> oriClass = iMetric.getClass();
-    	Method method = null;
-		try {
-			method = oriClass.getMethod(IShellMetric.SHELL_METRICS_UPDATE_METHOD_NAME, new Class[]{Object.class});
-		} catch (SecurityException e) {
-			LOG.error("handleMetrics get method ["+name+"] SecurityException");
-			throw new RuntimeException(e);
-		} catch (NoSuchMethodException e) {
-			LOG.error("handleMetrics get method ["+name+"] NoSuchMethodException");
-			throw new RuntimeException(e);
-		}
-    	try {
-			method.invoke(iMetric, paramsObj);
-		} catch (IllegalArgumentException e) {
-			LOG.error("handleMetrics invoke["+name+"] IllegalArgumentException");
-			throw new RuntimeException(e);
-		} catch (IllegalAccessException e) {
-			LOG.error("handleMetrics invoke["+name+"] IllegalAccessException");
-			throw new RuntimeException(e);
-		} catch (InvocationTargetException e) {
-			LOG.error("handleMetrics invoke["+name+"] InvocationTargetException");
-			throw new RuntimeException(e);
-		}
+        //get metric by name
+        IMetric iMetric = _context.getRegisteredMetricByName(name);
+        if (iMetric == null) {
+            throw new RuntimeException("Not find metric by name["+name+"] ");
+        }
+        if ( !(iMetric instanceof IShellMetric)) {
+            throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+        }
+        IShellMetric iShellMetric = (IShellMetric)iMetric;
+        
+        //call updateMetricFromRPC with params
+        Object paramsObj = action.get("params");
+        try {
+            iShellMetric.updateMetricFromRPC(paramsObj);
+        } catch (RuntimeException re) {
+            throw re;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }       
     }
 
     private void die(Throwable exception) {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index 1285739..c0fcf20 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -252,23 +252,31 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
 
         return metric;
     }
-    
-    /*
-     * Convinience method for ShellBolt to registering ShellMetric.
-     */
-    public <T extends IShellMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs, ShellBolt bolt) {
-    	bolt.registerMetric(name, metric);
-        return registerMetric(name, metric, timeBucketSizeInSecs);
-    }
-    
-    /*
-     * Convinience method for ShellSpout to registering ShellMetric.
+
+    /**
+     * Get component's metric from registered metrics by name.
+     * Notice: Normally, one component can only register one metric name once.
+     *         But now registerMetric has a bug(https://issues.apache.org/jira/browse/STORM-254) 
+     *         cause the same metric name can register twice.
+     *         So we just return the first metric we meet.
      */
-    public <T extends IShellMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs, ShellSpout spout) {
-    	spout.registerMetric(name, metric);
-        return registerMetric(name, metric, timeBucketSizeInSecs);
-    }
+    public IMetric getRegisteredMetricByName(String name) {
+        IMetric metric = null;
 
+        for (Map<Integer, Map<String, IMetric>> taskIdToNameToMetric: _registeredMetrics.values()) {
+            Map<String, IMetric> nameToMetric = taskIdToNameToMetric.get(_taskId);
+            if (nameToMetric != null) {
+                metric = nameToMetric.get(name);
+                if (metric != null) {
+                    //we just return the first metric we meet
+                    break;  
+                }
+            }
+        } 
+        
+        return metric;
+    }   
+ 
     /*
      * Convinience method for registering ReducedMetric.
      */


[08/24] git commit: add cgroup support & storm.py bug fix

Posted by bo...@apache.org.
add cgroup support & storm.py bug fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ead42379
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ead42379
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ead42379

Branch: refs/heads/master
Commit: ead423793d392959f41c2782a05ed23413bc84a3
Parents: bc39172
Author: JuDasheng <ju...@meituan.com>
Authored: Tue Jun 10 18:48:06 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Tue Jun 10 18:48:06 2014 +0800

----------------------------------------------------------------------
 patch/STORM-132_PULL-36.patch | 31 +++++++++++++++++++++++++++++++
 1 file changed, 31 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ead42379/patch/STORM-132_PULL-36.patch
----------------------------------------------------------------------
diff --git a/patch/STORM-132_PULL-36.patch b/patch/STORM-132_PULL-36.patch
new file mode 100644
index 0000000..005ac26
--- /dev/null
+++ b/patch/STORM-132_PULL-36.patch
@@ -0,0 +1,31 @@
+From 6b275d95fbdbc8374a215ecb2551f0fca3438d81 Mon Sep 17 00:00:00 2001
+From: Kang Xiao <kx...@gmail.com>
+Date: Tue, 18 Feb 2014 23:23:50 +0800
+Subject: [PATCH] STORM-132 sort supervisor by free slot in desending order in
+ EvenScheduler to schedule more evenly between supervisor
+
+---
+ storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj | 7 ++++++-
+ 1 file changed, 6 insertions(+), 1 deletion(-)
+
+diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
+index 28b9202..828606d 100644
+--- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
++++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
+@@ -22,7 +22,12 @@
+     :implements [backtype.storm.scheduler.IScheduler]))
+ 
+ (defn sort-slots [all-slots]
+-  (let [split-up (vals (group-by first all-slots))]
++  (let [split-up
++         (map second
++           (reverse
++             (sort
++               (for [[host ports] (group-by first all-slots)]
++                 [(count ports) ports]))))]
+     (apply interleave-all split-up)
+     ))
+ 
+-- 
+1.8.5.1
+


[17/24] git commit: change Multilang compitable with storm-incubating-v0.9.3 and add unit tests

Posted by bo...@apache.org.
change Multilang compitable with storm-incubating-v0.9.3 and add unit tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ecd0adb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ecd0adb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ecd0adb8

Branch: refs/heads/master
Commit: ecd0adb812defd0f11f77b3d2e046a0adf787079
Parents: d448e34
Author: dashengju <da...@qq.com>
Authored: Wed Jun 18 13:52:27 2014 +0800
Committer: dashengju <da...@qq.com>
Committed: Wed Jun 18 13:52:27 2014 +0800

----------------------------------------------------------------------
 .../src/dev/resources/tester_bolt_metrics.py    | 35 +++++++++
 .../src/dev/resources/tester_spout_metrics.py   | 53 ++++++++++++++
 .../metric/api/rpc/AssignableShellMetric.java   | 30 ++++++++
 .../metric/api/rpc/CombinedShellMetric.java     | 31 ++++++++
 .../storm/metric/api/rpc/CountShellMetric.java  | 38 ++++++++++
 .../storm/metric/api/rpc/IShellMetric.java      | 31 ++++++++
 .../metric/api/rpc/ReducedShellMetric.java      | 32 +++++++++
 .../storm/multilang/JsonSerializer.java         | 10 +++
 .../jvm/backtype/storm/multilang/ShellMsg.java  | 20 ++++++
 .../jvm/backtype/storm/spout/ShellSpout.java    | 37 ++++++++++
 .../src/jvm/backtype/storm/task/ShellBolt.java  | 38 ++++++++++
 .../backtype/storm/task/TopologyContext.java    | 24 +++++++
 .../storm/testing/PythonShellMetricsBolt.java   | 32 +++++++++
 .../storm/testing/PythonShellMetricsSpout.java  | 35 +++++++++
 storm-core/src/multilang/py/storm.py            |  3 +
 .../test/clj/backtype/storm/metrics_test.clj    | 74 ++++++++++++++++++--
 16 files changed, 519 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/dev/resources/tester_bolt_metrics.py
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/tester_bolt_metrics.py b/storm-core/src/dev/resources/tester_bolt_metrics.py
new file mode 100644
index 0000000..13a2d18
--- /dev/null
+++ b/storm-core/src/dev/resources/tester_bolt_metrics.py
@@ -0,0 +1,35 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This Python file uses the following encoding: utf-8
+
+import storm
+from random import random
+
+class TesterMetricsBolt(storm.Bolt):
+    def initialize(self, conf, context):
+        storm.log('bolt initializing')
+
+    def process(self, tup):
+        word = tup.values[0];
+        
+        storm.rpcMetrics("my-custom-shell-metric", 1);
+        
+        storm.ack(tup)
+
+TesterMetricsBolt().run()

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/dev/resources/tester_spout_metrics.py
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/tester_spout_metrics.py b/storm-core/src/dev/resources/tester_spout_metrics.py
new file mode 100644
index 0000000..a91128a
--- /dev/null
+++ b/storm-core/src/dev/resources/tester_spout_metrics.py
@@ -0,0 +1,53 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This Python file uses the following encoding: utf-8
+
+import storm
+from random import choice
+from time import sleep
+from uuid import uuid4
+
+words = [u"nathan", u"mike", u"jackson", u"golda", u"bertels人"]
+
+class TesterSpout(storm.Spout):
+    def initialize(self, conf, context):
+        storm.log('spout initializing')
+        self.pending = {}
+        self.count = 0
+
+    def nextTuple(self):
+        sleep(1)
+        storm.log("TesterSpout emit a tuple")
+        word = choice(words)
+        id = str(uuid4())
+        self.pending[id] = word
+        if self.count < 2:
+            storm.rpcMetrics("my-custom-shellspout-metric", 1)
+            storm.log("TesterSpout update my-custom-shellspout-metric")
+            self.count = self.count + 1
+        storm.emit([word], id=id)
+
+    def ack(self, id):
+        del self.pending[id]
+
+    def fail(self, id):
+        storm.log("emitting " + self.pending[id] + " on fail")
+        storm.emit([self.pending[id]], id=id)
+
+TesterSpout().run()

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
new file mode 100644
index 0000000..20387ed
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.AssignableMetric;
+
+public class AssignableShellMetric extends AssignableMetric implements IShellMetric {
+    public AssignableShellMetric(Object value) {
+        super(value);
+    }
+
+    public void updateMetricFromRPC(Object value) {
+        setValue(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
new file mode 100644
index 0000000..231c571
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.api.ICombiner;
+
+public class CombinedShellMetric extends CombinedMetric implements IShellMetric {
+    public CombinedShellMetric(ICombiner combiner) {
+        super(combiner);
+    }
+
+    public void updateMetricFromRPC(Object value) {
+        update(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
new file mode 100644
index 0000000..def74c2
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.CountMetric;
+
+public class CountShellMetric extends CountMetric implements IShellMetric {
+    /***
+     * @param
+     *  params should be null or long
+     *  if value is null, it will call incr()
+     *  if value is long, it will call incrBy((long)params)
+     * */
+    public void updateMetricFromRPC(Object value) {
+        if (value == null) {
+            incr();
+        } else if (value instanceof Long) {
+            incrBy((Long)value);
+        } else {
+            throw new RuntimeException("CountShellMetric updateMetricFromRPC params should be null or Long");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
new file mode 100644
index 0000000..d53baea
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.IMetric;
+
+public interface IShellMetric extends IMetric {
+    /***
+     * @function
+     *     This interface is used by ShellBolt and ShellSpout through RPC call to update Metric 
+     * @param
+     *     value used to update metric, its's meaning change according implementation
+     *     Object can be any json support types: String, Long, Double, Boolean, Null, List, Map
+     * */
+    public void updateMetricFromRPC(Object value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
new file mode 100644
index 0000000..097ed51
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.IReducer;
+import backtype.storm.metric.api.ReducedMetric;
+
+public class ReducedShellMetric extends ReducedMetric implements IShellMetric {
+
+    public ReducedShellMetric(IReducer reducer) {
+        super(reducer);
+    }
+
+    public void updateMetricFromRPC(Object value) {
+        update(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java b/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
index d94464e..4d3c3f8 100644
--- a/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
+++ b/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
@@ -135,6 +135,16 @@ public class JsonSerializer implements ISerializer {
                 shellMsg.addAnchor((String) o);
             }
         }
+       
+        Object nameObj = msg.get("name"); 
+        String metricName = null;
+        if (nameObj != null && nameObj instanceof String) {
+            metricName = (String) nameObj;
+        }
+        shellMsg.setMetricName(metricName);
+        
+        Object paramsObj = msg.get("params");
+        shellMsg.setMetricParams(paramsObj); 
 
         return shellMsg;
     }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java b/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
index b78a960..ed803c4 100644
--- a/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
+++ b/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
@@ -42,6 +42,10 @@ public class ShellMsg {
     private List<Object> tuple;
     private boolean needTaskIds;
 
+    //metrics rpc 
+    private String metricName;
+    private Object metricParams;
+
     public String getCommand() {
         return command;
     }
@@ -119,4 +123,20 @@ public class ShellMsg {
     public void setNeedTaskIds(boolean needTaskIds) {
         this.needTaskIds = needTaskIds;
     }
+
+    public void setMetricName(String metricName) {
+        this.metricName = metricName;
+    }
+
+    public String getMetricName() {
+        return this.metricName;
+    }
+
+    public void setMetricParams(Object metricParams) {
+        this.metricParams = metricParams;
+    }
+
+    public Object getMetricParams() {
+        return metricParams;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index d6a18e7..9f4b38e 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -18,6 +18,8 @@
 package backtype.storm.spout;
 
 import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.rpc.IShellMetric;
 import backtype.storm.multilang.ShellMsg;
 import backtype.storm.multilang.SpoutMsg;
 import backtype.storm.task.TopologyContext;
@@ -25,6 +27,7 @@ import backtype.storm.utils.ShellProcess;
 import java.util.Map;
 import java.util.List;
 import java.io.IOException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +38,9 @@ public class ShellSpout implements ISpout {
     private SpoutOutputCollector _collector;
     private String[] _command;
     private ShellProcess _process;
+    
+    private TopologyContext _context;
+    
     private SpoutMsg _spoutMsg;
 
     public ShellSpout(ShellComponent component) {
@@ -48,6 +54,7 @@ public class ShellSpout implements ISpout {
     public void open(Map stormConf, TopologyContext context,
                      SpoutOutputCollector collector) {
         _collector = collector;
+        _context = context;
 
         _process = new ShellProcess(_command);
 
@@ -85,6 +92,34 @@ public class ShellSpout implements ISpout {
         _spoutMsg.setId(msgId);
         querySubprocess();
     }
+    
+    private void handleMetrics(ShellMsg shellMsg) {
+        //get metric name
+        String name = shellMsg.getMetricName();
+        if (name.isEmpty()) {
+            throw new RuntimeException("Receive Metrics name is empty");
+        }
+        
+        //get metric by name
+        IMetric iMetric = _context.getRegisteredMetricByName(name);
+        if (iMetric == null) {
+            throw new RuntimeException("Not find metric by name["+name+"] ");
+        }
+        if ( !(iMetric instanceof IShellMetric)) {
+            throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+        }
+        IShellMetric iShellMetric = (IShellMetric)iMetric;
+        
+        //call updateMetricFromRPC with params
+        Object paramsObj = shellMsg.getMetricParams();
+        try {
+            iShellMetric.updateMetricFromRPC(paramsObj);
+        } catch (RuntimeException re) {
+            throw re;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }       
+    }
 
     private void querySubprocess() {
         try {
@@ -111,6 +146,8 @@ public class ShellSpout implements ISpout {
                     } else {
                         _collector.emitDirect((int) task.longValue(), stream, tuple, messageId);
                     }
+                } else if (command.equals("metrics")) {
+                    handleMetrics(shellMsg);
                 } else {
                     throw new RuntimeException("Unknown command received: " + command);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 81aca02..b02a7fd 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -19,6 +19,9 @@ package backtype.storm.task;
 
 import backtype.storm.Config;
 import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.rpc.IShellMetric;
+import backtype.storm.tuple.MessageId;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.utils.ShellProcess;
 import backtype.storm.multilang.BoltMsg;
@@ -76,6 +79,8 @@ public class ShellBolt implements IBolt {
 
     private Thread _readerThread;
     private Thread _writerThread;
+    
+    private TopologyContext _context;
 
     public ShellBolt(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
@@ -93,6 +98,9 @@ public class ShellBolt implements IBolt {
         }
         _rand = new Random();
         _collector = collector;
+
+        _context = context;
+
         _process = new ShellProcess(_command);
 
         //subprocesses must send their pid first thing
@@ -118,6 +126,8 @@ public class ShellBolt implements IBolt {
                             LOG.info("Shell msg: " + msg);
                         } else if (command.equals("emit")) {
                             handleEmit(shellMsg);
+                        } else if (command.equals("metrics")) {
+                            handleMetrics(shellMsg);
                         }
                     } catch (InterruptedException e) {
                     } catch (Throwable t) {
@@ -223,6 +233,34 @@ public class ShellBolt implements IBolt {
                     shellMsg.getStream(), anchors, shellMsg.getTuple());
         }
     }
+    
+    private void handleMetrics(ShellMsg shellMsg) {
+        //get metric name
+        String name = shellMsg.getMetricName();
+        if (name.isEmpty()) {
+            throw new RuntimeException("Receive Metrics name is empty");
+        }
+        
+        //get metric by name
+        IMetric iMetric = _context.getRegisteredMetricByName(name);
+        if (iMetric == null) {
+            throw new RuntimeException("Not find metric by name["+name+"] ");
+        }
+        if ( !(iMetric instanceof IShellMetric)) {
+            throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+        }
+        IShellMetric iShellMetric = (IShellMetric)iMetric;
+        
+        //call updateMetricFromRPC with params
+        Object paramsObj = shellMsg.getMetricParams();
+        try {
+            iShellMetric.updateMetricFromRPC(paramsObj);
+        } catch (RuntimeException re) {
+            throw re;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }       
+    }
 
     private void die(Throwable exception) {
         _exception = exception;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index 10e630c..356edb3 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -271,6 +271,30 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
         return metric;
     }
 
+    /**
+     * Get component's metric from registered metrics by name.
+     * Notice: Normally, one component can only register one metric name once.
+     *         But now registerMetric has a bug(https://issues.apache.org/jira/browse/STORM-254) 
+     *         cause the same metric name can register twice.
+     *         So we just return the first metric we meet.
+     */
+    public IMetric getRegisteredMetricByName(String name) {
+        IMetric metric = null;
+
+        for (Map<Integer, Map<String, IMetric>> taskIdToNameToMetric: _registeredMetrics.values()) {
+            Map<String, IMetric> nameToMetric = taskIdToNameToMetric.get(_taskId);
+            if (nameToMetric != null) {
+                metric = nameToMetric.get(name);
+                if (metric != null) {
+                    //we just return the first metric we meet
+                    break;  
+                }
+            }
+        } 
+        
+        return metric;
+    }   
+ 
     /*
      * Convinience method for registering ReducedMetric.
      */

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsBolt.java b/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsBolt.java
new file mode 100644
index 0000000..646d621
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsBolt.java
@@ -0,0 +1,32 @@
+package backtype.storm.testing;
+
+import java.util.Map;
+
+import backtype.storm.metric.api.rpc.CountShellMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.ShellBolt;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+
+public class PythonShellMetricsBolt extends ShellBolt implements IRichBolt {
+	private static final long serialVersionUID = 1999209252187463355L;
+	
+	public PythonShellMetricsBolt(String[] command) {
+		super(command);
+	}
+
+	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+		super.prepare(stormConf, context, collector);
+		
+		CountShellMetric cMetric = new CountShellMetric();
+		context.registerMetric("my-custom-shell-metric", cMetric, 5);
+	}
+	
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+	}
+
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsSpout.java b/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsSpout.java
new file mode 100644
index 0000000..04bd4ae
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsSpout.java
@@ -0,0 +1,35 @@
+package backtype.storm.testing;
+
+import java.util.Map;
+
+import backtype.storm.metric.api.rpc.CountShellMetric;
+import backtype.storm.spout.ShellSpout;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+public class PythonShellMetricsSpout extends ShellSpout implements IRichSpout {
+	private static final long serialVersionUID = 1999209252187463355L;
+
+	public PythonShellMetricsSpout(String[] command) {
+		super(command);
+	}
+	
+	@Override
+	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+		super.open(conf, context, collector);
+	
+		CountShellMetric cMetric = new CountShellMetric();
+		context.registerMetric("my-custom-shellspout-metric", cMetric, 5);
+	}
+
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("field1"));
+	}
+
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
index bec3f0c..9965c81 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -136,6 +136,9 @@ def reportError(msg):
 def log(msg):
     sendMsgToParent({"command": "log", "msg": msg})
 
+def rpcMetrics(name, params):
+    sendMsgToParent({"command": "metrics", "name": name, "params": params})
+
 def initComponent():
     setupInfo = readMsg()
     sendpid(setupInfo['pidDir'])

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/test/clj/backtype/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/metrics_test.clj b/storm-core/test/clj/backtype/storm/metrics_test.clj
index 2f5b0e1..4aaa3dc 100644
--- a/storm-core/test/clj/backtype/storm/metrics_test.clj
+++ b/storm-core/test/clj/backtype/storm/metrics_test.clj
@@ -18,8 +18,12 @@
   (:import [backtype.storm.topology TopologyBuilder])
   (:import [backtype.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus])
   (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount
-            TestAggregatesCounter TestConfBolt AckFailMapTracker])
+            TestAggregatesCounter TestConfBolt AckFailMapTracker PythonShellMetricsBolt PythonShellMetricsSpout])
+  (:import [backtype.storm.task ShellBolt])
+  (:import [backtype.storm.spout ShellSpout])
   (:import [backtype.storm.metric.api CountMetric IMetricsConsumer$DataPoint IMetricsConsumer$TaskInfo])
+  (:import [backtype.storm.metric.api.rpc CountShellMetric])
+  (:import [backtype.storm.utils Utils])
   
   (:use [backtype.storm bootstrap testing])
   (:use [backtype.storm.daemon common])
@@ -96,7 +100,10 @@
 (deftest test-custom-metric
   (with-simulated-time-local-cluster
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
-                           [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]}]
+                           [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+                           "storm.zookeeper.connection.timeout" 30000
+                           "storm.zookeeper.session.timeout" 60000
+                           }]
     (let [feeder (feeder-spout ["field1"])
           topology (thrift/mk-topology
                     {"1" (thrift/mk-spout-spec feeder)}
@@ -121,12 +128,15 @@
 (deftest test-custom-metric-with-multi-tasks
   (with-simulated-time-local-cluster
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
-                           [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]}]
+                           [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+                           "storm.zookeeper.connection.timeout" 30000
+                           "storm.zookeeper.session.timeout" 60000
+                           }]
     (let [feeder (feeder-spout ["field1"])
           topology (thrift/mk-topology
                      {"1" (thrift/mk-spout-spec feeder)}
                      {"2" (thrift/mk-bolt-spec {"1" :all} count-acks :p 1 :conf {TOPOLOGY-TASKS 2})})]
-      (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
+      (submit-local-topology (:nimbus cluster) "metrics-tester-with-multitasks" {} topology)
 
       (.feed feeder ["a"] 1)
       (advance-cluster-time cluster 6)
@@ -143,6 +153,62 @@
       (advance-cluster-time cluster 5)
       (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
 
+(defn mk-shell-bolt-with-metrics-spec
+  [inputs command & kwargs]
+  (let [command (into-array String command)]
+    (apply thrift/mk-bolt-spec inputs
+         (PythonShellMetricsBolt. command) kwargs)))
+
+(deftest test-custom-metric-with-multilang-py
+  (with-local-cluster 
+    [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+                       [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+                       "storm.zookeeper.connection.timeout" 30000
+                       "storm.zookeeper.session.timeout" 60000
+                       }]
+    (let [feeder (feeder-spout ["field1"])
+          topology (thrift/mk-topology
+                     {"1" (thrift/mk-spout-spec feeder)}
+                     {"2" (mk-shell-bolt-with-metrics-spec {"1" :global} ["python" "tester_bolt_metrics.py"])})]
+      (submit-local-topology (:nimbus cluster) "shell-metrics-tester" {} topology)
+
+      (.feed feeder ["a"] 1)
+      (Thread/sleep 6000)
+      (assert-buckets! "2" "my-custom-shell-metric" [1])
+            
+      (Thread/sleep 5000)
+      (assert-buckets! "2" "my-custom-shell-metric" [1 0])
+
+      (Thread/sleep 20000)
+      (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0])
+      
+      (.feed feeder ["b"] 2)
+      (.feed feeder ["c"] 3)               
+      (Thread/sleep 5000)
+      (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0 2])
+      )))
+
+(defn mk-shell-spout-with-metrics-spec
+  [command & kwargs]
+  (let [command (into-array String command)]
+    (apply thrift/mk-spout-spec (PythonShellMetricsSpout. command) kwargs)))
+
+(deftest test-custom-metric-with-spout-multilang-py
+  (with-local-cluster 
+    [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+                       [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+                       "storm.zookeeper.connection.timeout" 30000
+                       "storm.zookeeper.session.timeout" 60000}]
+    (let [topology (thrift/mk-topology
+                     {"1" (mk-shell-spout-with-metrics-spec ["python" "tester_spout_metrics.py"])}
+                     {"2" (thrift/mk-bolt-spec {"1" :all} count-acks)})]
+      (submit-local-topology (:nimbus cluster) "shell-spout-metrics-tester" {} topology)
+
+      (Thread/sleep 7000)
+      (assert-buckets! "1" "my-custom-shellspout-metric" [2])
+      )))
+
+
 (deftest test-builtin-metrics-1
   (with-simulated-time-local-cluster
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER                    


[09/24] git commit: add cgroup support & storm.py bug fix

Posted by bo...@apache.org.
add cgroup support & storm.py bug fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/0279d725
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/0279d725
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/0279d725

Branch: refs/heads/master
Commit: 0279d725cb6ace2e4f4cb64d48ef236a97f0d785
Parents: ead4237
Author: JuDasheng <ju...@meituan.com>
Authored: Tue Jun 10 18:48:47 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Tue Jun 10 18:48:47 2014 +0800

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/supervisor.clj       |  3 ++-
 .../src/clj/backtype/storm/scheduler/EvenScheduler.clj |  7 ++++++-
 storm-core/src/jvm/backtype/storm/Config.java          |  6 ++++++
 storm-core/src/multilang/py/storm.py                   | 13 +++++++++++--
 4 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0279d725/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 7566a79..556653a 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -474,7 +474,8 @@
                                   (substitute-worker-childopts s port))
           logfilename (str "worker-" port ".log")
           command (concat
-                    [(java-cmd) "-server"]
+                    (conf WORKER-CHILDCGROUP)
+		    [(java-cmd) "-server"]
                     worker-childopts
                     topo-worker-childopts
                     [(str "-Djava.library.path=" jlp)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0279d725/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
index 28b9202..828606d 100644
--- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
+++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
@@ -22,7 +22,12 @@
     :implements [backtype.storm.scheduler.IScheduler]))
 
 (defn sort-slots [all-slots]
-  (let [split-up (vals (group-by first all-slots))]
+  (let [split-up
+         (map second
+           (reverse
+             (sort
+               (for [[host ports] (group-by first all-slots)]
+                 [(count ports) ports]))))]
     (apply interleave-all split-up)
     ))
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0279d725/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index ff309a5..531fa14 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -475,6 +475,12 @@ public class Config extends HashMap<String, Object> {
     public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
 
     /**
+     * The cgroup opts provided to workers launched by this supervisor.
+     */
+    public static final String WORKER_CHILDCGROUP = "worker.childcgroup";
+    public static final Object WORKER_CHILDCGROUP_SCHEMA = String.class;
+
+    /**
      * control how many worker receiver threads we need per worker
      */
     public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0279d725/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
index c0387b4..5de3c0d 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -34,9 +34,11 @@ def readMsg():
     msg = ""
     while True:
         line = sys.stdin.readline()[0:-1]
-        if line == "end":
+	if not line:
+	    raise Exception('Read EOF from stdin')
+        if line[0:-1] == "end":
             break
-        msg = msg + line + "\n"
+        msg = msg + line
     return json_decode(msg[0:-1])
 
 MODE = None
@@ -180,6 +182,13 @@ class BasicBolt(object):
     def initialize(self, stormconf, context):
         pass
 
+    def redirect_stdout_to_stderr(self):
+        self.bakup_stdout = sys.stdout
+        sys.stdout = sys.stderr
+
+    def recover_stdout(self):
+        sys.stdout = self.bakup_stdout
+
     def process(self, tuple):
         pass
 


[24/24] git commit: Added STORM-200 to Changelog and Readme

Posted by bo...@apache.org.
Added STORM-200 to Changelog and Readme


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/8da9572a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/8da9572a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/8da9572a

Branch: refs/heads/master
Commit: 8da9572ac7d12e843ce8579c9e5629a528a3d0ea
Parents: ff345c1
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Tue Jul 1 15:38:41 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Tue Jul 1 15:38:41 2014 -0500

----------------------------------------------------------------------
 CHANGELOG.md    | 1 +
 README.markdown | 1 +
 2 files changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/8da9572a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 79fff25..7e37996 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -13,6 +13,7 @@
  * STORM-372: Typo in storm_env.ini
  * STORM-266: Adding shell process pid and name in the log message
  * STORM-367: Storm UI REST api documentation.
+ * STORM-200: Proposal for Multilang's Metrics feature
 
 ## 0.9.2-incubating
  * STORM-66: send taskid on initial handshake

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/8da9572a/README.markdown
----------------------------------------------------------------------
diff --git a/README.markdown b/README.markdown
index de16a4c..f026e6d 100644
--- a/README.markdown
+++ b/README.markdown
@@ -148,6 +148,7 @@ under the License.
 * Krystian Nowak ([@krystiannowak](https://github.com/krystiannowak))
 * Parth-Brahmbhatt ([@Parth-Brahmbhatt](https://github.com/Parth-Brahmbhatt))
 * Adrian Petrescu ([@apetresc](https://github.com/apetresc))
+* DashengJu ([@dashengju](https://github.com/dashengju))
 
 ## Acknowledgements
 


[15/24] git commit: remove other code and only multilang metircs

Posted by bo...@apache.org.
remove other code and only multilang metircs


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/a4b26af6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/a4b26af6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/a4b26af6

Branch: refs/heads/master
Commit: a4b26af6044b3774fec6224a74b8b3f2b994d535
Parents: 22a6ca9
Author: JuDasheng <ju...@meituan.com>
Authored: Fri Jun 13 11:49:42 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Fri Jun 13 11:49:42 2014 +0800

----------------------------------------------------------------------
 build.sh                                        |   9 -
 examples/storm-starter/pom.xml                  |   2 +-
 external/storm-kafka/pom.xml                    |   2 +-
 patch/STORM-132_PULL-36.patch                   |  31 --
 pom.xml                                         |  20 +-
 .../maven-shade-clojure-transformer/pom.xml     |   4 +-
 storm-core/dependency-reduced-pom.xml           | 359 -------------------
 storm-core/pom.xml                              |   4 -
 .../src/clj/backtype/storm/daemon/executor.clj  |  67 +---
 .../clj/backtype/storm/daemon/supervisor.clj    |   5 +-
 .../src/clj/backtype/storm/daemon/task.clj      |   4 +-
 .../backtype/storm/scheduler/EvenScheduler.clj  |   7 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  74 ++--
 storm-core/src/jvm/backtype/storm/Config.java   |   6 -
 storm-core/src/multilang/py/storm.py            |  17 +-
 storm-core/src/ui/public/topology.html          |   6 +-
 storm-dist/binary/pom.xml                       |   4 -
 17 files changed, 72 insertions(+), 549 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/build.sh
----------------------------------------------------------------------
diff --git a/build.sh b/build.sh
deleted file mode 100755
index 2563c33..0000000
--- a/build.sh
+++ /dev/null
@@ -1,9 +0,0 @@
-#!/bin/bash
-
-mvn clean install -DskipTests=true 
-
-cd storm-dist/binary 
-
-mvn package
-
-cd -

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index b56aa92..903c6e7 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -20,7 +20,7 @@
   <parent>
       <artifactId>storm</artifactId>
       <groupId>org.apache.storm</groupId>
-      <version>0.9.2-incubating-mt0000</version>
+      <version>0.9.3-incubating-SNAPSHOT</version>
       <relativePath>../../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/external/storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index b7aaccc..4972619 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.2-incubating-mt0000</version>
+        <version>0.9.3-incubating-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/patch/STORM-132_PULL-36.patch
----------------------------------------------------------------------
diff --git a/patch/STORM-132_PULL-36.patch b/patch/STORM-132_PULL-36.patch
deleted file mode 100644
index 005ac26..0000000
--- a/patch/STORM-132_PULL-36.patch
+++ /dev/null
@@ -1,31 +0,0 @@
-From 6b275d95fbdbc8374a215ecb2551f0fca3438d81 Mon Sep 17 00:00:00 2001
-From: Kang Xiao <kx...@gmail.com>
-Date: Tue, 18 Feb 2014 23:23:50 +0800
-Subject: [PATCH] STORM-132 sort supervisor by free slot in desending order in
- EvenScheduler to schedule more evenly between supervisor
-
----
- storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj | 7 ++++++-
- 1 file changed, 6 insertions(+), 1 deletion(-)
-
-diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
-index 28b9202..828606d 100644
---- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
-+++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
-@@ -22,7 +22,12 @@
-     :implements [backtype.storm.scheduler.IScheduler]))
- 
- (defn sort-slots [all-slots]
--  (let [split-up (vals (group-by first all-slots))]
-+  (let [split-up
-+         (map second
-+           (reverse
-+             (sort
-+               (for [[host ports] (group-by first all-slots)]
-+                 [(count ports) ports]))))]
-     (apply interleave-all split-up)
-     ))
- 
--- 
-1.8.5.1
-

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c07a471..b7286dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,17 +18,16 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
-    <!--
     <parent>
         <groupId>org.apache</groupId>
         <artifactId>apache</artifactId>
         <version>10</version>
     </parent>
-    -->
+
 
     <groupId>org.apache.storm</groupId>
     <artifactId>storm</artifactId>
-    <version>0.9.2-incubating-mt0000</version>
+    <version>0.9.3-incubating-SNAPSHOT</version>
     <packaging>pom</packaging>
     <name>Storm</name>
     <description>Distributed and fault-tolerant realtime computation</description>
@@ -282,16 +281,11 @@
     </profiles>
 
     <distributionManagement>
-        <repository>
-            <id>nexus-releases</id>
-            <name>nexus-releases</name>
-            <url>http://nexus.sankuai.com:8081/nexus/content/repositories/releases</url>
-        </repository>
-        <snapshotRepository>
-            <id>nexus-snapshots</id>
-            <name>nexus-snapshots</name>
-            <url>http://nexus.sankuai.com:8081/nexus/content/repositories/snapshots</url>
-        </snapshotRepository>
+        <site>
+            <id>storm.maven.website</id>
+            <name>Storm Website</name>
+            <url>file:///tmp/site</url>
+        </site>
     </distributionManagement>
 
     <dependencyManagement>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-buildtools/maven-shade-clojure-transformer/pom.xml
----------------------------------------------------------------------
diff --git a/storm-buildtools/maven-shade-clojure-transformer/pom.xml b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
index 1c944d5..a6fbad1 100644
--- a/storm-buildtools/maven-shade-clojure-transformer/pom.xml
+++ b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.2-incubating-mt0000</version>
+        <version>0.9.3-incubating-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
@@ -36,4 +36,4 @@
             <scope>provided</scope>
         </dependency>
     </dependencies>
-</project>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/dependency-reduced-pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/dependency-reduced-pom.xml b/storm-core/dependency-reduced-pom.xml
deleted file mode 100644
index 9dacd73..0000000
--- a/storm-core/dependency-reduced-pom.xml
+++ /dev/null
@@ -1,359 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <parent>
-    <artifactId>storm</artifactId>
-    <groupId>org.apache.storm</groupId>
-    <version>0.9.2-incubating-mt0000</version>
-  </parent>
-  <modelVersion>4.0.0</modelVersion>
-  <groupId>org.apache.storm</groupId>
-  <artifactId>storm-core</artifactId>
-  <name>Storm Core</name>
-  <description>Storm Core Java API and Clojure implementation.</description>
-  <build>
-    <sourceDirectory>src/jvm</sourceDirectory>
-    <resources>
-      <resource>
-        <directory>../conf</directory>
-      </resource>
-      <resource>
-        <targetPath>META-INF</targetPath>
-        <directory>../</directory>
-        <includes>
-          <include>NOTICE</include>
-        </includes>
-      </resource>
-    </resources>
-    <testResources>
-      <testResource>
-        <directory>src/dev</directory>
-      </testResource>
-      <testResource>
-        <directory>test/resources</directory>
-      </testResource>
-    </testResources>
-    <plugins>
-      <plugin>
-        <groupId>com.theoryinpractise</groupId>
-        <artifactId>clojure-maven-plugin</artifactId>
-        <extensions>true</extensions>
-        <executions>
-          <execution>
-            <id>compile-clojure</id>
-            <phase>compile</phase>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-          </execution>
-          <execution>
-            <id>test-clojure</id>
-            <phase>test</phase>
-            <goals>
-              <goal>test-with-junit</goal>
-            </goals>
-            <configuration>
-              <vmargs>${test.extra.args}</vmargs>
-            </configuration>
-          </execution>
-        </executions>
-        <configuration>
-          <sourceDirectories>
-            <sourceDirectory>src/clj</sourceDirectory>
-          </sourceDirectories>
-          <testSourceDirectories>
-            <testSourceDirectory>test/clj</testSourceDirectory>
-          </testSourceDirectories>
-          <warnOnReflection>false</warnOnReflection>
-          <copyDeclaredNamespaceOnly>true</copyDeclaredNamespaceOnly>
-          <copiedNamespaces>
-            <copiedNamespace>none</copiedNamespace>
-          </copiedNamespaces>
-        </configuration>
-      </plugin>
-      <plugin>
-        <artifactId>maven-surefire-report-plugin</artifactId>
-        <configuration>
-          <reportsDirectories>
-            <file>${project.build.directory}/test-reports</file>
-          </reportsDirectories>
-        </configuration>
-      </plugin>
-      <plugin>
-        <artifactId>maven-shade-plugin</artifactId>
-        <version>2.2</version>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>shade</goal>
-            </goals>
-          </execution>
-        </executions>
-        <dependencies>
-          <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>maven-shade-clojure-transformer</artifactId>
-            <version>${project.version}</version>
-          </dependency>
-        </dependencies>
-        <configuration>
-          <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
-          <promoteTransitiveDependencies>false</promoteTransitiveDependencies>
-          <createDependencyReducedPom>true</createDependencyReducedPom>
-          <minimizeJar>false</minimizeJar>
-          <artifactSet>
-            <includes>
-              <include>org.apache.thrift:*</include>
-              <include>org.apache.storm:*</include>
-            </includes>
-          </artifactSet>
-          <relocations>
-            <relocation>
-              <pattern>org.apache.thrift</pattern>
-              <shadedPattern>org.apache.thrift7</shadedPattern>
-            </relocation>
-          </relocations>
-          <transformers>
-            <transformer />
-          </transformers>
-          <filters>
-            <filter>
-              <artifact>org.apache.thrift:*</artifact>
-              <excludes>
-                <exclude>META-INF/LICENSE.txt</exclude>
-                <exclude>META-INF/NOTICE.txt</exclude>
-              </excludes>
-            </filter>
-          </filters>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-  <dependencies>
-    <dependency>
-      <groupId>org.clojure</groupId>
-      <artifactId>clojure</artifactId>
-      <version>1.5.1</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>clj-time</groupId>
-      <artifactId>clj-time</artifactId>
-      <version>0.4.1</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>compojure</groupId>
-      <artifactId>compojure</artifactId>
-      <version>1.1.3</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>hiccup</groupId>
-      <artifactId>hiccup</artifactId>
-      <version>0.3.6</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>ring</groupId>
-      <artifactId>ring-devel</artifactId>
-      <version>0.3.11</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>ring</groupId>
-      <artifactId>ring-jetty-adapter</artifactId>
-      <version>0.3.11</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.clojure</groupId>
-      <artifactId>tools.logging</artifactId>
-      <version>0.2.3</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.clojure</groupId>
-      <artifactId>math.numeric-tower</artifactId>
-      <version>0.0.1</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.clojure</groupId>
-      <artifactId>tools.cli</artifactId>
-      <version>0.2.4</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.clojure</groupId>
-      <artifactId>tools.nrepl</artifactId>
-      <version>0.2.3</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <artifactId>clojure</artifactId>
-          <groupId>org.clojure</groupId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>clojure-complete</groupId>
-      <artifactId>clojure-complete</artifactId>
-      <version>0.2.3</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <artifactId>clojure</artifactId>
-          <groupId>org.clojure</groupId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>commons-io</groupId>
-      <artifactId>commons-io</artifactId>
-      <version>2.4</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-exec</artifactId>
-      <version>1.1</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>commons-lang</groupId>
-      <artifactId>commons-lang</artifactId>
-      <version>2.5</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.thrift</groupId>
-      <artifactId>libthrift</artifactId>
-      <version>0.7.0</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <artifactId>slf4j-api</artifactId>
-          <groupId>org.slf4j</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>servlet-api</artifactId>
-          <groupId>javax.servlet</groupId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.curator</groupId>
-      <artifactId>curator-framework</artifactId>
-      <version>2.4.0</version>
-      <scope>compile</scope>
-      <exclusions>
-        <exclusion>
-          <artifactId>log4j</artifactId>
-          <groupId>log4j</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>slf4j-log4j12</artifactId>
-          <groupId>org.slf4j</groupId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>com.googlecode.json-simple</groupId>
-      <artifactId>json-simple</artifactId>
-      <version>1.1</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>carbonite</artifactId>
-      <version>1.4.0</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.yaml</groupId>
-      <artifactId>snakeyaml</artifactId>
-      <version>1.11</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.httpcomponents</groupId>
-      <artifactId>httpclient</artifactId>
-      <version>4.3.3</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.googlecode.disruptor</groupId>
-      <artifactId>disruptor</artifactId>
-      <version>2.10.1</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.jgrapht</groupId>
-      <artifactId>jgrapht-core</artifactId>
-      <version>0.9.0</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <version>13.0</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>ch.qos.logback</groupId>
-      <artifactId>logback-classic</artifactId>
-      <version>1.0.6</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>log4j-over-slf4j</artifactId>
-      <version>1.6.6</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty</artifactId>
-      <version>3.6.3.Final</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-all</artifactId>
-      <version>1.9.5</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.clojars.runa</groupId>
-      <artifactId>conjure</artifactId>
-      <version>2.1.3</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>reply</groupId>
-      <artifactId>reply</artifactId>
-      <version>0.3.0</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <artifactId>cd-client</artifactId>
-          <groupId>org.thnetos</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>drawbridge</artifactId>
-          <groupId>com.cemerick</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>versioneer</artifactId>
-          <groupId>trptcolin</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>sjacket</artifactId>
-          <groupId>org.clojars.trptcolin</groupId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-  </dependencies>
-</project>
-

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 91cd370..134eeb8 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -20,11 +20,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-<<<<<<< HEAD
-        <version>0.9.2-incubating-mt0000</version>
-=======
         <version>0.9.3-incubating-SNAPSHOT</version>
->>>>>>> upstream/master
     </parent>
     <groupId>org.apache.storm</groupId>
     <artifactId>storm-core</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 64e60be..1bbe53d 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -18,7 +18,6 @@
   (:use [backtype.storm bootstrap])
   (:import [backtype.storm.hooks ITaskHook])
   (:import [backtype.storm.tuple Tuple])
-  (:import [backtype.storm.tuple MessageId])
   (:import [backtype.storm.spout ISpoutWaitStrategy])
   (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
             EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
@@ -390,25 +389,12 @@
   (let [^KryoTupleDeserializer deserializer (:deserializer executor-data)
         task-ids (:task-ids executor-data)
         debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
-        component-id (:component-id executor-data)
-        executor-id (:executor-id executor-data)
-        executor-type (:type executor-data)
         ]
     (disruptor/clojure-handler
       (fn [tuple-batch sequence-id end-of-batch?]
         (fast-list-iter [[task-id msg] tuple-batch]
-          (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))
-                tuple-streamid (.getSourceStreamId tuple)
-                tuple-source (.getSourceComponent tuple)
-                tuple-id (.getMessageId tuple)
-                tuple-values (.getValues tuple)
-                ]
-            (when debug? 
-              (if (= tuple-streamid "default")
-                (log-message "Component[" component-id "] Type[RECV] from Stream[" tuple-streamid "] Source[" tuple-source "] TupleId[" tuple-id "] TupleValue[" tuple-values "]")
-                (log-message "Component[" component-id "] Type[RECV] from Stream[" tuple-streamid "] Source[" tuple-source "] TupleId[" tuple-values "]")
-                )
-              )
+          (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))]
+            (when debug? (log-message "Processing received message " tuple))
             (if task-id
               (tuple-action-fn task-id tuple)
               ;; null task ids are broadcast tuples
@@ -435,7 +421,6 @@
         last-active (atom false)        
         spouts (ArrayList. (map :object (vals task-datas)))
         rand (Random. (Utils/secureRandomLong))
-        debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
         
         pending (RotatingMap.
                  2 ;; microoptimize for performance of .size method
@@ -443,12 +428,9 @@
                    (expire [this msg-id [task-id spout-id tuple-info start-time-ms]]
                      (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
                        (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta)
-                       (when debug? 
-                         (log-message "Component[" component-id "] FAILED-TUPLE reason[EXPIRED] TupleId[" msg-id "] values[" tuple-info "]"))
                        ))))
         tuple-action-fn (fn [task-id ^TupleImpl tuple]
-                          (let [stream-id (.getSourceStreamId tuple)
-                                tuple-id (.getMessageId tuple)]
+                          (let [stream-id (.getSourceStreamId tuple)]
                             (condp = stream-id
                               Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
                               Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
@@ -459,18 +441,10 @@
                                     (throw-runtime "Fatal error, mismatched task ids: " task-id " " stored-task-id))
                                   (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
                                     (condp = stream-id
-                                      ACKER-ACK-STREAM-ID (do 
-                                                            (ack-spout-msg executor-data (get task-datas task-id)
-                                                                            spout-id tuple-finished-info time-delta)
-                                                            (when debug?
-                                                              (log-message "Component[" component-id "] ACK-TUPLE reason[RECV] TupleId[" id "] values[" tuple-finished-info "]"))
-                                                            )
-                                      ACKER-FAIL-STREAM-ID (do 
-                                                             (fail-spout-msg executor-data (get task-datas task-id)
-                                                                              spout-id tuple-finished-info time-delta)
-                                                             (when debug?
-                                                              (log-message "Component[" component-id "] FAILED-TUPLE reason[RECV] TupleId[" id "] values[" tuple-finished-info "]"))
-                                                             )
+                                      ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
+                                                                         spout-id tuple-finished-info time-delta)
+                                      ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
+                                                                           spout-id tuple-finished-info time-delta)
                                       )))
                                 ;; TODO: on failure, emit tuple to failure stream
                                 ))))
@@ -519,8 +493,6 @@
                                                            (transfer-fn out-task
                                                                         out-tuple
                                                                         overflow-buffer)
-                                                           (when debug? 
-                                                             (log-message "Component[" component-id "] Type[EMIT] to Stream[" out-stream-id "] TupleId[" tuple-id "] values[" values "]"))
                                                            ))
                                          (if rooted?
                                            (do
@@ -626,8 +598,6 @@
         {:keys [storm-conf component-id worker-context transfer-fn report-error sampler
                 open-or-prepare-was-called?]} executor-data
         rand (Random. (Utils/secureRandomLong))
-        debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
-
         tuple-action-fn (fn [task-id ^TupleImpl tuple]
                           ;; synchronization needs to be done with a key provided by this bolt, otherwise:
                           ;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
@@ -690,13 +660,7 @@
                                                     (tasks-fn task stream values)
                                                     (tasks-fn stream values))]
                                     (fast-list-iter [t out-tasks]
-                                                    (let [anchors-to-ids (HashMap.)
-                                                          out-tuple (TupleImpl. worker-context
-                                                                               values
-                                                                               task-id
-                                                                               stream
-                                                                               (MessageId/makeId anchors-to-ids))
-                                                          ]
+                                                    (let [anchors-to-ids (HashMap.)]
                                                       (fast-list-iter [^TupleImpl a anchors]
                                                                       (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
                                                                         (when (pos? (count root-ids))
@@ -705,15 +669,12 @@
                                                                             (fast-list-iter [root-id root-ids]
                                                                                             (put-xor! anchors-to-ids root-id edge-id))
                                                                             ))))
-                                                      (transfer-fn t out-tuple)
-                                                      (when debug? 
-                                                        (if (= component-id "__acker")
-                                                          (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" (.get values 0) "]")
-                                                          (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" (.getMessageId out-tuple) "] values[" values "]")
-                                                          )
-                                                        
-                                                        )
-                                                      ))
+                                                      (transfer-fn t
+                                                                   (TupleImpl. worker-context
+                                                                               values
+                                                                               task-id
+                                                                               stream
+                                                                               (MessageId/makeId anchors-to-ids)))))
                                     (or out-tasks [])))]]
           (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
           (if (= component-id Constants/SYSTEM_COMPONENT_ID)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 53b2802..7566a79 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -473,11 +473,8 @@
           topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)]
                                   (substitute-worker-childopts s port))
           logfilename (str "worker-" port ".log")
-          worker-childcgroup (when-let [s (conf WORKER-CHILDCGROUP)] 
-                                (.split s " "))
           command (concat
-                    worker-childcgroup
-		    [(java-cmd) "-server"]
+                    [(java-cmd) "-server"]
                     worker-childopts
                     topo-worker-childopts
                     [(str "-Djava.library.path=" jlp)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/clj/backtype/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/task.clj b/storm-core/src/clj/backtype/storm/daemon/task.clj
index 29756a1..3650150 100644
--- a/storm-core/src/clj/backtype/storm/daemon/task.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/task.clj
@@ -131,7 +131,7 @@
         
     (fn ([^Integer out-task-id ^String stream ^List values]
           (when debug?
-            (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" values "]"))
+            (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
           (let [target-component (.getComponentId worker-context out-task-id)
                 component->grouping (get stream->component->grouper stream)
                 grouping (get component->grouping target-component)
@@ -149,7 +149,7 @@
             ))
         ([^String stream ^List values]
            (when debug?
-             (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" values "]"))
+             (log-message "Emitting: " component-id " " stream " " values))
            (let [out-tasks (ArrayList.)]
              (fast-map-iter [[out-component grouper] (get stream->component->grouper stream)]
                (when (= :direct grouper)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
index 828606d..28b9202 100644
--- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
+++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
@@ -22,12 +22,7 @@
     :implements [backtype.storm.scheduler.IScheduler]))
 
 (defn sort-slots [all-slots]
-  (let [split-up
-         (map second
-           (reverse
-             (sort
-               (for [[host ports] (group-by first all-slots)]
-                 [(count ports) ports]))))]
+  (let [split-up (vals (group-by first all-slots))]
     (apply interleave-all split-up)
     ))
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 4098038..5f2bcba 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -780,43 +780,43 @@
        (let [id (url-decode id)
              component (url-decode component)]
          (json-response (component-page id component (:window m) (check-include-sys? (:sys m))))))
-  ;(POST "/api/v1/topology/:id/activate" [id]
-  ;  (with-nimbus nimbus
-  ;    (let [id (url-decode id)
-  ;          tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
-  ;          name (.get_name tplg)]
-  ;      (.activate nimbus name)
-  ;      (log-message "Activating topology '" name "'")))
-  ;  (resp/redirect (str "/api/v1/topology/" id)))
-
-  ;(POST "/api/v1/topology/:id/deactivate" [id]
-  ;  (with-nimbus nimbus
-  ;    (let [id (url-decode id)
-  ;          tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
-  ;          name (.get_name tplg)]
-  ;      (.deactivate nimbus name)
-  ;      (log-message "Deactivating topology '" name "'")))
-  ;  (resp/redirect (str "/api/v1/topology/" id)))
-  ;(POST "/api/v1/topology/:id/rebalance/:wait-time" [id wait-time]
-  ;  (with-nimbus nimbus
-  ;    (let [id (url-decode id)
-  ;          tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
-  ;          name (.get_name tplg)
-  ;          options (RebalanceOptions.)]
-  ;      (.set_wait_secs options (Integer/parseInt wait-time))
-  ;      (.rebalance nimbus name options)
-  ;      (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
-  ;  (resp/redirect (str "/api/v1/topology/" id)))
-  ;(POST "/api/v1/topology/:id/kill/:wait-time" [id wait-time]
-  ;  (with-nimbus nimbus
-  ;    (let [id (url-decode id)
-  ;          tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
-  ;          name (.get_name tplg)
-  ;          options (KillOptions.)]
-  ;      (.set_wait_secs options (Integer/parseInt wait-time))
-  ;      (.killTopologyWithOpts nimbus name options)
-  ;      (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
-  ;  (resp/redirect (str "/api/v1/topology/" id)))
+  (POST "/api/v1/topology/:id/activate" [id]
+    (with-nimbus nimbus
+      (let [id (url-decode id)
+            tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+            name (.get_name tplg)]
+        (.activate nimbus name)
+        (log-message "Activating topology '" name "'")))
+    (resp/redirect (str "/api/v1/topology/" id)))
+
+  (POST "/api/v1/topology/:id/deactivate" [id]
+    (with-nimbus nimbus
+      (let [id (url-decode id)
+            tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+            name (.get_name tplg)]
+        (.deactivate nimbus name)
+        (log-message "Deactivating topology '" name "'")))
+    (resp/redirect (str "/api/v1/topology/" id)))
+  (POST "/api/v1/topology/:id/rebalance/:wait-time" [id wait-time]
+    (with-nimbus nimbus
+      (let [id (url-decode id)
+            tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+            name (.get_name tplg)
+            options (RebalanceOptions.)]
+        (.set_wait_secs options (Integer/parseInt wait-time))
+        (.rebalance nimbus name options)
+        (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
+    (resp/redirect (str "/api/v1/topology/" id)))
+  (POST "/api/v1/topology/:id/kill/:wait-time" [id wait-time]
+    (with-nimbus nimbus
+      (let [id (url-decode id)
+            tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+            name (.get_name tplg)
+            options (KillOptions.)]
+        (.set_wait_secs options (Integer/parseInt wait-time))
+        (.killTopologyWithOpts nimbus name options)
+        (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
+    (resp/redirect (str "/api/v1/topology/" id)))
 
   (GET "/" [:as {cookies :cookies}]
        (resp/redirect "/index.html"))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 531fa14..ff309a5 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -475,12 +475,6 @@ public class Config extends HashMap<String, Object> {
     public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
 
     /**
-     * The cgroup opts provided to workers launched by this supervisor.
-     */
-    public static final String WORKER_CHILDCGROUP = "worker.childcgroup";
-    public static final Object WORKER_CHILDCGROUP_SCHEMA = String.class;
-
-    /**
      * control how many worker receiver threads we need per worker
      */
     public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
index a4c8c2c..9965c81 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -33,12 +33,10 @@ json_decode = lambda x: json.loads(x)
 def readMsg():
     msg = ""
     while True:
-        line = sys.stdin.readline()
-	if not line:
-	    raise Exception('Read EOF from stdin')
-        if line[0:-1] == "end":
+        line = sys.stdin.readline()[0:-1]
+        if line == "end":
             break
-        msg = msg + line
+        msg = msg + line + "\n"
     return json_decode(msg[0:-1])
 
 MODE = None
@@ -137,7 +135,7 @@ def reportError(msg):
 
 def log(msg):
     sendMsgToParent({"command": "log", "msg": msg})
-    
+
 def rpcMetrics(name, params):
     sendMsgToParent({"command": "metrics", "name": name, "params": params})
 
@@ -182,13 +180,6 @@ class BasicBolt(object):
     def initialize(self, stormconf, context):
         pass
 
-    def redirect_stdout_to_stderr(self):
-        self.bakup_stdout = sys.stdout
-        sys.stdout = sys.stderr
-
-    def recover_stdout(self):
-        sys.stdout = self.bakup_stdout
-
     def process(self, tuple):
         pass
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/ui/public/topology.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html
index 2ebab42..1020719 100644
--- a/storm-core/src/ui/public/topology.html
+++ b/storm-core/src/ui/public/topology.html
@@ -36,13 +36,11 @@
 <h2>Topology summary</h2>
 <div id="topology-summary">
 </div>
-<!--
 <div id="topology-actions">
 <h2 class="js-only">Topology actions</h2>
 <p id="topology-actions" class="js-only">
 </p>
 </div>
--->
 <div id="topology-stats"></div>
 <div id="spout-stats">
 </div>
@@ -79,13 +77,13 @@ $(document).ready(function() {
         var spoutStats = $("#spout-stats");
         var boltStats = $("#bolt-stats");
         var config = $("#topology-configuration");
-        //var topologyActions = $("#topology-actions");
+        var topologyActions = $("#topology-actions");
         var topologyVisualization = $("#topology-visualization")
         var formattedConfig = formatConfigData(response["configuration"]);
         var buttonJsonData = topologyActionJson(response["id"],response["name"],response["status"],response["msgTimeout"]);
         $.get("/templates/topology-page-template.html", function(template) {
             topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response));
-            //topologyActions.append(Mustache.render($(template).filter("#topology-actions-template").html(),buttonJsonData));
+            topologyActions.append(Mustache.render($(template).filter("#topology-actions-template").html(),buttonJsonData));
             topologyStats.append(Mustache.render($(template).filter("#topology-stats-template").html(),response));
             $("#topology-stats-table").tablesorter({ sortList: [[0,0]], headers: {0: { sorter: "stormtimestr"}}});
             spoutStats.append(Mustache.render($(template).filter("#spout-stats-template").html(),response));

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-dist/binary/pom.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/pom.xml b/storm-dist/binary/pom.xml
index 464ecd7..0d97c0b 100644
--- a/storm-dist/binary/pom.xml
+++ b/storm-dist/binary/pom.xml
@@ -21,11 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-<<<<<<< HEAD
-        <version>0.9.2-incubating-mt0000</version>
-=======
         <version>0.9.3-incubating-SNAPSHOT</version>
->>>>>>> upstream/master
         <relativePath>../../pom.xml</relativePath>
     </parent>
     <groupId>org.apache.storm</groupId>


[07/24] git commit: merge 0.9.2 incubating

Posted by bo...@apache.org.
merge 0.9.2 incubating


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/bc391722
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/bc391722
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/bc391722

Branch: refs/heads/master
Commit: bc391722098c1e66f6ebe0377a3de6b51a7cf38c
Parents: e2a7f7c c89fb82
Author: JuDasheng <ju...@meituan.com>
Authored: Tue Jun 10 17:43:29 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Tue Jun 10 17:43:29 2014 +0800

----------------------------------------------------------------------
 .gitignore                                      |    4 +-
 CHANGELOG.md                                    |   64 +
 DEVELOPER.md                                    |  313 +++++
 NOTICE                                          |    6 +-
 README.markdown                                 |   12 +-
 SECURITY.md                                     |   74 ++
 bin/storm                                       |   59 +-
 bin/storm-config.cmd                            |   27 +-
 bin/storm.cmd                                   |   59 +-
 conf/defaults.yaml                              |   11 +-
 conf/storm.yaml.example                         |    2 +-
 conf/storm_env.ini                              |   30 +
 examples/storm-starter/README.markdown          |  127 ++
 .../multilang/resources/splitsentence.py        |   24 +
 .../multilang/resources/splitsentence.rb        |   26 +
 .../storm-starter/multilang/resources/storm.py  |  221 ++++
 .../storm-starter/multilang/resources/storm.rb  |  200 +++
 examples/storm-starter/pom.xml                  |  166 +++
 .../src/clj/storm/starter/clj/word_count.clj    |   95 ++
 .../jvm/storm/starter/BasicDRPCTopology.java    |   78 ++
 .../jvm/storm/starter/ExclamationTopology.java  |   87 ++
 .../src/jvm/storm/starter/ManualDRPC.java       |   68 +
 .../jvm/storm/starter/PrintSampleStream.java    |   58 +
 .../src/jvm/storm/starter/ReachTopology.java    |  196 +++
 .../src/jvm/storm/starter/RollingTopWords.java  |   78 ++
 .../jvm/storm/starter/SingleJoinExample.java    |   64 +
 .../storm/starter/TransactionalGlobalCount.java |  173 +++
 .../jvm/storm/starter/TransactionalWords.java   |  246 ++++
 .../jvm/storm/starter/WordCountTopology.java    |  107 ++
 .../storm/starter/bolt/AbstractRankerBolt.java  |  110 ++
 .../starter/bolt/IntermediateRankingsBolt.java  |   58 +
 .../src/jvm/storm/starter/bolt/PrinterBolt.java |   37 +
 .../storm/starter/bolt/RollingCountBolt.java    |  142 ++
 .../jvm/storm/starter/bolt/SingleJoinBolt.java  |  114 ++
 .../storm/starter/bolt/TotalRankingsBolt.java   |   59 +
 .../starter/spout/RandomSentenceSpout.java      |   64 +
 .../storm/starter/spout/TwitterSampleSpout.java |  164 +++
 .../tools/NthLastModifiedTimeTracker.java       |   70 +
 .../src/jvm/storm/starter/tools/Rankable.java   |   32 +
 .../starter/tools/RankableObjectWithFields.java |  148 +++
 .../src/jvm/storm/starter/tools/Rankings.java   |  156 +++
 .../starter/tools/SlidingWindowCounter.java     |  119 ++
 .../storm/starter/tools/SlotBasedCounter.java   |  118 ++
 .../jvm/storm/starter/trident/TridentReach.java |  156 +++
 .../storm/starter/trident/TridentWordCount.java |   85 ++
 .../src/jvm/storm/starter/util/StormRunner.java |   39 +
 .../jvm/storm/starter/util/TupleHelpers.java    |   33 +
 .../bolt/IntermediateRankingsBoltTest.java      |  146 +++
 .../starter/bolt/RollingCountBoltTest.java      |  113 ++
 .../starter/bolt/TotalRankingsBoltTest.java     |  147 +++
 .../storm/starter/tools/MockTupleHelpers.java   |   40 +
 .../tools/NthLastModifiedTimeTrackerTest.java   |  125 ++
 .../tools/RankableObjectWithFieldsTest.java     |  252 ++++
 .../jvm/storm/starter/tools/RankingsTest.java   |  368 ++++++
 .../starter/tools/SlidingWindowCounterTest.java |  106 ++
 .../starter/tools/SlotBasedCounterTest.java     |  181 +++
 external/storm-kafka/CHANGELOG.md               |   13 +
 external/storm-kafka/README.md                  |   28 +
 external/storm-kafka/pom.xml                    |  138 ++
 .../storm-kafka/src/jvm/storm/kafka/Broker.java |   81 ++
 .../src/jvm/storm/kafka/BrokerHosts.java        |   25 +
 .../jvm/storm/kafka/DynamicBrokersReader.java   |  147 +++
 .../kafka/DynamicPartitionConnections.java      |   94 ++
 .../jvm/storm/kafka/FailedFetchException.java   |   29 +
 .../src/jvm/storm/kafka/KafkaConfig.java        |   52 +
 .../src/jvm/storm/kafka/KafkaError.java         |   43 +
 .../src/jvm/storm/kafka/KafkaSpout.java         |  195 +++
 .../src/jvm/storm/kafka/KafkaUtils.java         |  243 ++++
 .../src/jvm/storm/kafka/KeyValueScheme.java     |   28 +
 .../kafka/KeyValueSchemeAsMultiScheme.java      |   37 +
 .../src/jvm/storm/kafka/Partition.java          |   64 +
 .../jvm/storm/kafka/PartitionCoordinator.java   |   28 +
 .../src/jvm/storm/kafka/PartitionManager.java   |  265 ++++
 .../src/jvm/storm/kafka/SpoutConfig.java        |   36 +
 .../src/jvm/storm/kafka/StaticCoordinator.java  |   51 +
 .../src/jvm/storm/kafka/StaticHosts.java        |   38 +
 .../storm/kafka/StaticPartitionConnections.java |   52 +
 .../jvm/storm/kafka/StringKeyValueScheme.java   |   37 +
 .../src/jvm/storm/kafka/StringScheme.java       |   46 +
 .../src/jvm/storm/kafka/ZkCoordinator.java      |  113 ++
 .../src/jvm/storm/kafka/ZkHosts.java            |   36 +
 .../src/jvm/storm/kafka/ZkState.java            |  116 ++
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |   89 ++
 .../jvm/storm/kafka/trident/Coordinator.java    |   50 +
 .../storm/kafka/trident/DefaultCoordinator.java |   31 +
 .../trident/GlobalPartitionInformation.java     |   98 ++
 .../storm/kafka/trident/IBatchCoordinator.java  |   26 +
 .../jvm/storm/kafka/trident/IBrokerReader.java  |   25 +
 .../src/jvm/storm/kafka/trident/MaxMetric.java  |   40 +
 .../kafka/trident/OpaqueTridentKafkaSpout.java  |   59 +
 .../storm/kafka/trident/StaticBrokerReader.java |   36 +
 .../trident/TransactionalTridentKafkaSpout.java |   58 +
 .../storm/kafka/trident/TridentKafkaConfig.java |   37 +
 .../kafka/trident/TridentKafkaEmitter.java      |  268 ++++
 .../jvm/storm/kafka/trident/ZkBrokerReader.java |   70 +
 .../storm/kafka/DynamicBrokersReaderTest.java   |  155 +++
 .../src/test/storm/kafka/KafkaErrorTest.java    |   39 +
 .../src/test/storm/kafka/KafkaTestBroker.java   |   75 ++
 .../src/test/storm/kafka/KafkaUtilsTest.java    |  225 ++++
 .../storm/kafka/StringKeyValueSchemeTest.java   |   38 +
 .../src/test/storm/kafka/TestUtils.java         |   20 +
 .../src/test/storm/kafka/ZkCoordinatorTest.java |  130 ++
 .../test/storm/kafka/bolt/KafkaBoltTest.java    |  171 +++
 logback/cluster.xml                             |    2 +-
 pom.xml                                         |   69 +-
 storm-core/pom.xml                              |   49 +-
 .../clj/backtype/storm/command/rebalance.clj    |    7 +-
 storm-core/src/clj/backtype/storm/config.clj    |    2 +-
 .../src/clj/backtype/storm/daemon/common.clj    |    6 +-
 .../src/clj/backtype/storm/daemon/drpc.clj      |   17 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |   43 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |   41 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |    9 -
 .../clj/backtype/storm/daemon/supervisor.clj    |   66 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |   76 +-
 storm-core/src/clj/backtype/storm/disruptor.clj |   10 +-
 .../src/clj/backtype/storm/messaging/loader.clj |   81 +-
 .../src/clj/backtype/storm/messaging/local.clj  |   20 +-
 storm-core/src/clj/backtype/storm/testing.clj   |    5 +-
 storm-core/src/clj/backtype/storm/timer.clj     |    7 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   | 1209 +++++++-----------
 .../src/clj/backtype/storm/ui/helpers.clj       |    5 +-
 storm-core/src/clj/backtype/storm/util.clj      |   17 +-
 storm-core/src/clj/backtype/storm/zookeeper.clj |   10 +-
 storm-core/src/jvm/backtype/storm/Config.java   |   38 +-
 .../jvm/backtype/storm/ConfigValidation.java    |   17 +
 .../src/jvm/backtype/storm/StormSubmitter.java  |  176 ++-
 .../backtype/storm/messaging/IConnection.java   |   15 +-
 .../backtype/storm/messaging/netty/Client.java  |  333 +++--
 .../backtype/storm/messaging/netty/Context.java |   57 +-
 .../storm/messaging/netty/ControlMessage.java   |    6 +-
 .../storm/messaging/netty/MessageBatch.java     |   52 +-
 .../storm/messaging/netty/MessageDecoder.java   |  108 +-
 .../netty/NettyRenameThreadFactory.java         |   35 +
 .../backtype/storm/messaging/netty/Server.java  |  147 ++-
 .../netty/StormClientErrorHandler.java          |   41 +
 .../messaging/netty/StormClientHandler.java     |  121 --
 .../netty/StormClientPipelineFactory.java       |    2 +-
 .../messaging/netty/StormServerHandler.java     |   40 +-
 .../jvm/backtype/storm/multilang/BoltMsg.java   |   63 +
 .../backtype/storm/multilang/ISerializer.java   |   65 +
 .../storm/multilang/JsonSerializer.java         |  162 +++
 .../storm/multilang/NoOutputException.java      |   23 +
 .../jvm/backtype/storm/multilang/ShellMsg.java  |  105 ++
 .../jvm/backtype/storm/multilang/SpoutMsg.java  |   34 +
 .../jvm/backtype/storm/spout/ShellSpout.java    |   86 +-
 .../src/jvm/backtype/storm/task/ShellBolt.java  |  115 +-
 .../backtype/storm/task/TopologyContext.java    |    9 +
 .../storm/testing/TestEventLogSpout.java        |  139 ++
 .../storm/testing/TestEventOrderCheckBolt.java  |   76 ++
 .../transactional/state/TransactionalState.java |    2 +-
 .../backtype/storm/utils/DisruptorQueue.java    |    9 +-
 .../jvm/backtype/storm/utils/ShellProcess.java  |  169 ++-
 .../src/jvm/backtype/storm/utils/Time.java      |   18 +-
 .../backtype/storm/utils/TransferDrainer.java   |  113 ++
 .../src/jvm/backtype/storm/utils/Utils.java     |  117 +-
 storm-core/src/jvm/storm/trident/Stream.java    |    5 +-
 .../trident/spout/RichSpoutBatchTriggerer.java  |    1 +
 .../jvm/storm/trident/state/map/OpaqueMap.java  |    6 +-
 .../trident/state/map/RemovableMapState.java    |    8 +
 .../storm/trident/testing/MemoryMapState.java   |   27 +-
 .../topology/state/TransactionalState.java      |    2 +-
 .../jvm/storm/trident/util/TridentUtils.java    |   14 +-
 storm-core/src/ui/public/component.html         |   88 ++
 storm-core/src/ui/public/index.html             |   73 ++
 storm-core/src/ui/public/js/arbor-graphics.js   |   51 +
 storm-core/src/ui/public/js/arbor-tween.js      |   86 ++
 storm-core/src/ui/public/js/arbor.js            |   67 +
 storm-core/src/ui/public/js/jquery.mustache.js  |  592 +++++++++
 storm-core/src/ui/public/js/purl.js             |  267 ++++
 storm-core/src/ui/public/js/script.js           |   51 +-
 storm-core/src/ui/public/js/visualization.js    |  403 ++++++
 .../templates/component-page-template.html      |  152 +++
 .../public/templates/index-page-template.html   |   62 +
 .../public/templates/json-error-template.html   |    4 +
 .../templates/topology-page-template.html       |  128 ++
 storm-core/src/ui/public/topology.html          |   90 ++
 .../test/clj/backtype/storm/config_test.clj     |   20 +
 .../test/clj/backtype/storm/drpc_test.clj       |   27 +-
 .../clj/backtype/storm/integration_test.clj     |    9 +-
 .../storm/messaging/netty_unit_test.clj         |   46 +-
 .../test/clj/backtype/storm/messaging_test.clj  |   35 +-
 .../test/clj/backtype/storm/metrics_test.clj    |   24 +
 .../test/clj/backtype/storm/multilang_test.clj  |    6 +-
 .../test/clj/backtype/storm/nimbus_test.clj     |   16 +-
 .../test/clj/backtype/storm/supervisor_test.clj |   66 +-
 .../test/clj/backtype/storm/utils_test.clj      |   13 +-
 .../test/clj/storm/trident/integration_test.clj |    2 +-
 .../test/clj/storm/trident/state_test.clj       |   33 +-
 storm-dist/binary/NOTICE                        |    9 +-
 storm-dist/binary/pom.xml                       |   40 +-
 storm-dist/binary/src/main/assembly/binary.xml  |   41 +
 storm-dist/source/pom.xml                       |   38 +-
 193 files changed, 14785 insertions(+), 1693 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/bc391722/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index f54cf72,d6a18e7..368f16a
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@@ -18,18 -18,15 +18,18 @@@
  package backtype.storm.spout;
  
  import backtype.storm.generated.ShellComponent;
 +import backtype.storm.metric.api.IMetric;
 +import backtype.storm.metric.api.rpc.IShellMetric;
+ import backtype.storm.multilang.ShellMsg;
+ import backtype.storm.multilang.SpoutMsg;
  import backtype.storm.task.TopologyContext;
  import backtype.storm.utils.ShellProcess;
  import java.util.Map;
  import java.util.List;
  import java.io.IOException;
 +
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
- import org.json.simple.JSONObject;
  
  
  public class ShellSpout implements ISpout {
@@@ -38,9 -35,8 +38,11 @@@
      private SpoutOutputCollector _collector;
      private String[] _command;
      private ShellProcess _process;
 +    
 +    private TopologyContext _context;
 +    
+     private SpoutMsg _spoutMsg;
+ 
      public ShellSpout(ShellComponent component) {
          this(component.get_execution_command(), component.get_script());
      }
@@@ -48,118 -44,75 +50,110 @@@
      public ShellSpout(String... command) {
          _command = command;
      }
-     
+ 
      public void open(Map stormConf, TopologyContext context,
                       SpoutOutputCollector collector) {
-         _process = new ShellProcess(_command);
          _collector = collector;
 +        _context = context;
  
-         try {
-             Number subpid = _process.launch(stormConf, context);
-             LOG.info("Launched subprocess with pid " + subpid);
-         } catch (IOException e) {
-             throw new RuntimeException("Error when launching multilang subprocess\n" + _process.getErrorsString(), e);
-         }
+         _process = new ShellProcess(_command);
+ 
+         Number subpid = _process.launch(stormConf, context);
+         LOG.info("Launched subprocess with pid " + subpid);
      }
  
      public void close() {
          _process.destroy();
      }
  
-     private JSONObject _next;
      public void nextTuple() {
-         if (_next == null) {
-             _next = new JSONObject();
-             _next.put("command", "next");
+         if (_spoutMsg == null) {
+             _spoutMsg = new SpoutMsg();
          }
- 
-         querySubprocess(_next);
+         _spoutMsg.setCommand("next");
+         _spoutMsg.setId("");
+         querySubprocess();
      }
  
-     private JSONObject _ack;
      public void ack(Object msgId) {
-         if (_ack == null) {
-             _ack = new JSONObject();
-             _ack.put("command", "ack");
+         if (_spoutMsg == null) {
+             _spoutMsg = new SpoutMsg();
          }
- 
-         _ack.put("id", msgId);
-         querySubprocess(_ack);
+         _spoutMsg.setCommand("ack");
+         _spoutMsg.setId(msgId);
+         querySubprocess();
      }
  
-     private JSONObject _fail;
      public void fail(Object msgId) {
-         if (_fail == null) {
-             _fail = new JSONObject();
-             _fail.put("command", "fail");
+         if (_spoutMsg == null) {
+             _spoutMsg = new SpoutMsg();
          }
- 
-         _fail.put("id", msgId);
-         querySubprocess(_fail);
+         _spoutMsg.setCommand("fail");
+         _spoutMsg.setId(msgId);
+         querySubprocess();
      }
 +    
 +    private void handleMetrics(Map action) {
 +        //get metric name
 +        Object nameObj = action.get("name");
 +        if (nameObj == null || !(nameObj instanceof String) ) {
 +            throw new RuntimeException("Receive Metrics name is null or is not String");
 +        }
 +        String name = (String) nameObj;
 +        if (name.isEmpty()) {
 +            throw new RuntimeException("Receive Metrics name is empty");
 +        }
 +        
 +        //get metric by name
 +        IMetric iMetric = _context.getRegisteredMetricByName(name);
 +        if (iMetric == null) {
 +            throw new RuntimeException("Not find metric by name["+name+"] ");
 +        }
 +        if ( !(iMetric instanceof IShellMetric)) {
 +            throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
 +        }
 +        IShellMetric iShellMetric = (IShellMetric)iMetric;
 +        
 +        //call updateMetricFromRPC with params
 +        Object paramsObj = action.get("params");
 +        try {
 +            iShellMetric.updateMetricFromRPC(paramsObj);
 +        } catch (RuntimeException re) {
 +            throw re;
 +        } catch (Exception e) {
 +            throw new RuntimeException(e);
 +        }       
 +    }
  
-     private void querySubprocess(Object query) {
+     private void querySubprocess() {
          try {
-             _process.writeMessage(query);
+             _process.writeSpoutMsg(_spoutMsg);
  
              while (true) {
-                 JSONObject action = _process.readMessage();
-                 String command = (String) action.get("command");
+                 ShellMsg shellMsg = _process.readShellMsg();
+                 String command = shellMsg.getCommand();
                  if (command.equals("sync")) {
                      return;
                  } else if (command.equals("log")) {
-                     String msg = (String) action.get("msg");
+                     String msg = shellMsg.getMsg();
                      LOG.info("Shell msg: " + msg);
                  } else if (command.equals("emit")) {
-                     String stream = (String) action.get("stream");
-                     if (stream == null) stream = Utils.DEFAULT_STREAM_ID;
-                     Long task = (Long) action.get("task");
-                     List<Object> tuple = (List) action.get("tuple");
-                     Object messageId = (Object) action.get("id");
-                     if (task == null) {
+                     String stream = shellMsg.getStream();
+                     Long task = shellMsg.getTask();
+                     List<Object> tuple = shellMsg.getTuple();
+                     Object messageId = shellMsg.getId();
+                     if (task == 0) {
                          List<Integer> outtasks = _collector.emit(stream, tuple, messageId);
-                         Object need_task_ids = action.get("need_task_ids");
-                         if (need_task_ids == null || ((Boolean) need_task_ids).booleanValue()) {
-                             _process.writeMessage(outtasks);
+                         if (shellMsg.areTaskIdsNeeded()) {
+                             _process.writeTaskIds(outtasks);
                          }
                      } else {
-                         _collector.emitDirect((int)task.longValue(), stream, tuple, messageId);
+                         _collector.emitDirect((int) task.longValue(), stream, tuple, messageId);
                      }
 +                } else if (command.equals("metrics")) {
 +                    handleMetrics(action);
+                 } else {
+                     throw new RuntimeException("Unknown command received: " + command);
                  }
              }
          } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/bc391722/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index de3c3f2,81aca02..50d6e20
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@@ -19,15 -19,13 +19,16 @@@ package backtype.storm.task
  
  import backtype.storm.Config;
  import backtype.storm.generated.ShellComponent;
 +import backtype.storm.metric.api.IMetric;
 +import backtype.storm.metric.api.rpc.IShellMetric;
 +import backtype.storm.tuple.MessageId;
  import backtype.storm.tuple.Tuple;
- import backtype.storm.utils.Utils;
  import backtype.storm.utils.ShellProcess;
- import java.io.IOException;
+ import backtype.storm.multilang.BoltMsg;
+ import backtype.storm.multilang.ShellMsg;
+ 
  import java.util.ArrayList;
- import java.util.Arrays;
+ import java.util.List;
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.LinkedBlockingQueue;
  import static java.util.concurrent.TimeUnit.SECONDS;
@@@ -77,11 -73,9 +76,11 @@@ public class ShellBolt implements IBol
      private volatile Throwable _exception;
      private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();
      private Random _rand;
-     
+ 
      private Thread _readerThread;
      private Thread _writerThread;
 +    
 +    private TopologyContext _context;
  
      public ShellBolt(ShellComponent component) {
          this(component.get_execution_command(), component.get_script());
@@@ -98,42 -92,32 +97,42 @@@
             this._pendingWrites = new LinkedBlockingQueue(((Number)maxPending).intValue());
          }
          _rand = new Random();
-         _process = new ShellProcess(_command);
          _collector = collector;
++<<<<<<< HEAD
 +        _context = context;
++=======
+         _process = new ShellProcess(_command);
++>>>>>>> upstream/master
  
-         try {
-             //subprocesses must send their pid first thing
-             Number subpid = _process.launch(stormConf, context);
-             LOG.info("Launched subprocess with pid " + subpid);
-         } catch (IOException e) {
-             throw new RuntimeException("Error when launching multilang subprocess\n" + _process.getErrorsString(), e);
-         }
+         //subprocesses must send their pid first thing
+         Number subpid = _process.launch(stormConf, context);
+         LOG.info("Launched subprocess with pid " + subpid);
  
          // reader
          _readerThread = new Thread(new Runnable() {
              public void run() {
                  while (_running) {
                      try {
-                         JSONObject action = _process.readMessage();
-                         if (action == null) {
-                             // ignore sync
-                         }
+                         ShellMsg shellMsg = _process.readShellMsg();
  
-                         String command = (String) action.get("command");
+                         String command = shellMsg.getCommand();
                          if(command.equals("ack")) {
-                             handleAck(action);
+                             handleAck(shellMsg.getId());
                          } else if (command.equals("fail")) {
-                             handleFail(action);
+                             handleFail(shellMsg.getId());
                          } else if (command.equals("error")) {
-                             handleError(action);
+                             handleError(shellMsg.getMsg());
                          } else if (command.equals("log")) {
-                             String msg = (String) action.get("msg");
+                             String msg = shellMsg.getMsg();
                              LOG.info("Shell msg: " + msg);
                          } else if (command.equals("emit")) {
++<<<<<<< HEAD
 +                            handleEmit(action);
 +                        } else if (command.equals("metrics")) {
 +                            handleMetrics(action);
++=======
+                             handleEmit(shellMsg);
++>>>>>>> upstream/master
                          }
                      } catch (InterruptedException e) {
                      } catch (Throwable t) {
@@@ -192,9 -179,8 +194,14 @@@
          _process.destroy();
          _inputs.clear();
      }
++<<<<<<< HEAD
 +    
 +    private void handleAck(Map action) {
 +        String id = (String) action.get("id");
++=======
+ 
+     private void handleAck(Object id) {
++>>>>>>> upstream/master
          Tuple acked = _inputs.remove(id);
          if(acked==null) {
              throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id);
@@@ -242,41 -219,10 +240,42 @@@
                  _pendingWrites.put(outtasks);
              }
          } else {
-             _collector.emitDirect((int)task.longValue(), stream, anchors, tuple);
+             _collector.emitDirect((int) shellMsg.getTask(),
+                     shellMsg.getStream(), anchors, shellMsg.getTuple());
          }
      }
 +    
 +    private void handleMetrics(Map action) {
 +        //get metric name
 +        Object nameObj = action.get("name");
 +        if (nameObj == null || !(nameObj instanceof String) ) {
 +            throw new RuntimeException("Receive Metrics name is null or is not String");
 +        }
 +        String name = (String) nameObj;
 +        if (name.isEmpty()) {
 +            throw new RuntimeException("Receive Metrics name is empty");
 +        }
 +        
 +        //get metric by name
 +        IMetric iMetric = _context.getRegisteredMetricByName(name);
 +        if (iMetric == null) {
 +            throw new RuntimeException("Not find metric by name["+name+"] ");
 +        }
 +        if ( !(iMetric instanceof IShellMetric)) {
 +            throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
 +        }
 +        IShellMetric iShellMetric = (IShellMetric)iMetric;
 +        
 +        //call updateMetricFromRPC with params
 +        Object paramsObj = action.get("params");
 +        try {
 +            iShellMetric.updateMetricFromRPC(paramsObj);
 +        } catch (RuntimeException re) {
 +            throw re;
 +        } catch (Exception e) {
 +            throw new RuntimeException(e);
 +        }       
 +    }
  
      private void die(Throwable exception) {
          _exception = exception;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/bc391722/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------


[14/24] git commit: merge apache-storm master

Posted by bo...@apache.org.
merge apache-storm master


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/22a6ca9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/22a6ca9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/22a6ca9b

Branch: refs/heads/master
Commit: 22a6ca9b94cf68709c16436acc7ad4ac6c7c9d7a
Parents: 4d6a27f 2a60e99
Author: JuDasheng <ju...@meituan.com>
Authored: Thu Jun 12 18:08:02 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Thu Jun 12 18:08:02 2014 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   2 +-
 LICENSE                                         | 124 ++++++-
 examples/storm-starter/pom.xml                  |   5 +-
 external/storm-kafka/README.md                  |  33 +-
 external/storm-kafka/pom.xml                    |  27 +-
 .../storm/kafka/DynamicBrokersReaderTest.java   |  17 +
 .../src/test/storm/kafka/KafkaErrorTest.java    |  17 +
 .../src/test/storm/kafka/KafkaTestBroker.java   |  17 +
 .../src/test/storm/kafka/KafkaUtilsTest.java    |  17 +
 .../storm/kafka/StringKeyValueSchemeTest.java   |  17 +
 .../src/test/storm/kafka/TestUtils.java         |  17 +
 .../src/test/storm/kafka/ZkCoordinatorTest.java |  17 +
 .../test/storm/kafka/bolt/KafkaBoltTest.java    |  17 +
 pom.xml                                         |  40 ++-
 storm-core/dependency-reduced-pom.xml           | 359 +++++++++++++++++++
 storm-core/pom.xml                              |   4 +
 .../netty/NettyRenameThreadFactory.java         |  17 +
 .../jvm/backtype/storm/multilang/BoltMsg.java   |  17 +
 .../backtype/storm/multilang/ISerializer.java   |  17 +
 .../storm/multilang/JsonSerializer.java         |  17 +
 .../storm/multilang/NoOutputException.java      |  17 +
 .../jvm/backtype/storm/multilang/ShellMsg.java  |  17 +
 .../jvm/backtype/storm/multilang/SpoutMsg.java  |  17 +
 .../trident/state/map/RemovableMapState.java    |  17 +
 storm-core/src/ui/public/component.html         |  16 +
 storm-core/src/ui/public/index.html             |  16 +
 storm-core/src/ui/public/js/visualization.js    |  20 +-
 .../templates/component-page-template.html      |  17 +
 .../public/templates/index-page-template.html   |  16 +
 .../public/templates/json-error-template.html   |  16 +
 .../templates/topology-page-template.html       |  16 +
 storm-core/src/ui/public/topology.html          |  17 +
 storm-dist/binary/LICENSE                       | 154 +++++++-
 storm-dist/binary/NOTICE                        |   2 +-
 storm-dist/binary/pom.xml                       |   4 +
 storm-dist/source/pom.xml                       |   2 +-
 36 files changed, 1122 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/examples/storm-starter/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/external/storm-kafka/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/storm-core/dependency-reduced-pom.xml
----------------------------------------------------------------------
diff --cc storm-core/dependency-reduced-pom.xml
index 0000000,0000000..9dacd73
new file mode 100644
--- /dev/null
+++ b/storm-core/dependency-reduced-pom.xml
@@@ -1,0 -1,0 +1,359 @@@
++<?xml version="1.0" encoding="UTF-8"?>
++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
++  <parent>
++    <artifactId>storm</artifactId>
++    <groupId>org.apache.storm</groupId>
++    <version>0.9.2-incubating-mt0000</version>
++  </parent>
++  <modelVersion>4.0.0</modelVersion>
++  <groupId>org.apache.storm</groupId>
++  <artifactId>storm-core</artifactId>
++  <name>Storm Core</name>
++  <description>Storm Core Java API and Clojure implementation.</description>
++  <build>
++    <sourceDirectory>src/jvm</sourceDirectory>
++    <resources>
++      <resource>
++        <directory>../conf</directory>
++      </resource>
++      <resource>
++        <targetPath>META-INF</targetPath>
++        <directory>../</directory>
++        <includes>
++          <include>NOTICE</include>
++        </includes>
++      </resource>
++    </resources>
++    <testResources>
++      <testResource>
++        <directory>src/dev</directory>
++      </testResource>
++      <testResource>
++        <directory>test/resources</directory>
++      </testResource>
++    </testResources>
++    <plugins>
++      <plugin>
++        <groupId>com.theoryinpractise</groupId>
++        <artifactId>clojure-maven-plugin</artifactId>
++        <extensions>true</extensions>
++        <executions>
++          <execution>
++            <id>compile-clojure</id>
++            <phase>compile</phase>
++            <goals>
++              <goal>compile</goal>
++            </goals>
++          </execution>
++          <execution>
++            <id>test-clojure</id>
++            <phase>test</phase>
++            <goals>
++              <goal>test-with-junit</goal>
++            </goals>
++            <configuration>
++              <vmargs>${test.extra.args}</vmargs>
++            </configuration>
++          </execution>
++        </executions>
++        <configuration>
++          <sourceDirectories>
++            <sourceDirectory>src/clj</sourceDirectory>
++          </sourceDirectories>
++          <testSourceDirectories>
++            <testSourceDirectory>test/clj</testSourceDirectory>
++          </testSourceDirectories>
++          <warnOnReflection>false</warnOnReflection>
++          <copyDeclaredNamespaceOnly>true</copyDeclaredNamespaceOnly>
++          <copiedNamespaces>
++            <copiedNamespace>none</copiedNamespace>
++          </copiedNamespaces>
++        </configuration>
++      </plugin>
++      <plugin>
++        <artifactId>maven-surefire-report-plugin</artifactId>
++        <configuration>
++          <reportsDirectories>
++            <file>${project.build.directory}/test-reports</file>
++          </reportsDirectories>
++        </configuration>
++      </plugin>
++      <plugin>
++        <artifactId>maven-shade-plugin</artifactId>
++        <version>2.2</version>
++        <executions>
++          <execution>
++            <phase>package</phase>
++            <goals>
++              <goal>shade</goal>
++            </goals>
++          </execution>
++        </executions>
++        <dependencies>
++          <dependency>
++            <groupId>org.apache.storm</groupId>
++            <artifactId>maven-shade-clojure-transformer</artifactId>
++            <version>${project.version}</version>
++          </dependency>
++        </dependencies>
++        <configuration>
++          <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
++          <promoteTransitiveDependencies>false</promoteTransitiveDependencies>
++          <createDependencyReducedPom>true</createDependencyReducedPom>
++          <minimizeJar>false</minimizeJar>
++          <artifactSet>
++            <includes>
++              <include>org.apache.thrift:*</include>
++              <include>org.apache.storm:*</include>
++            </includes>
++          </artifactSet>
++          <relocations>
++            <relocation>
++              <pattern>org.apache.thrift</pattern>
++              <shadedPattern>org.apache.thrift7</shadedPattern>
++            </relocation>
++          </relocations>
++          <transformers>
++            <transformer />
++          </transformers>
++          <filters>
++            <filter>
++              <artifact>org.apache.thrift:*</artifact>
++              <excludes>
++                <exclude>META-INF/LICENSE.txt</exclude>
++                <exclude>META-INF/NOTICE.txt</exclude>
++              </excludes>
++            </filter>
++          </filters>
++        </configuration>
++      </plugin>
++    </plugins>
++  </build>
++  <dependencies>
++    <dependency>
++      <groupId>org.clojure</groupId>
++      <artifactId>clojure</artifactId>
++      <version>1.5.1</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>clj-time</groupId>
++      <artifactId>clj-time</artifactId>
++      <version>0.4.1</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>compojure</groupId>
++      <artifactId>compojure</artifactId>
++      <version>1.1.3</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>hiccup</groupId>
++      <artifactId>hiccup</artifactId>
++      <version>0.3.6</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>ring</groupId>
++      <artifactId>ring-devel</artifactId>
++      <version>0.3.11</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>ring</groupId>
++      <artifactId>ring-jetty-adapter</artifactId>
++      <version>0.3.11</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.clojure</groupId>
++      <artifactId>tools.logging</artifactId>
++      <version>0.2.3</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.clojure</groupId>
++      <artifactId>math.numeric-tower</artifactId>
++      <version>0.0.1</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.clojure</groupId>
++      <artifactId>tools.cli</artifactId>
++      <version>0.2.4</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.clojure</groupId>
++      <artifactId>tools.nrepl</artifactId>
++      <version>0.2.3</version>
++      <scope>test</scope>
++      <exclusions>
++        <exclusion>
++          <artifactId>clojure</artifactId>
++          <groupId>org.clojure</groupId>
++        </exclusion>
++      </exclusions>
++    </dependency>
++    <dependency>
++      <groupId>clojure-complete</groupId>
++      <artifactId>clojure-complete</artifactId>
++      <version>0.2.3</version>
++      <scope>test</scope>
++      <exclusions>
++        <exclusion>
++          <artifactId>clojure</artifactId>
++          <groupId>org.clojure</groupId>
++        </exclusion>
++      </exclusions>
++    </dependency>
++    <dependency>
++      <groupId>commons-io</groupId>
++      <artifactId>commons-io</artifactId>
++      <version>2.4</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.apache.commons</groupId>
++      <artifactId>commons-exec</artifactId>
++      <version>1.1</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>commons-lang</groupId>
++      <artifactId>commons-lang</artifactId>
++      <version>2.5</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.apache.thrift</groupId>
++      <artifactId>libthrift</artifactId>
++      <version>0.7.0</version>
++      <scope>provided</scope>
++      <exclusions>
++        <exclusion>
++          <artifactId>slf4j-api</artifactId>
++          <groupId>org.slf4j</groupId>
++        </exclusion>
++        <exclusion>
++          <artifactId>servlet-api</artifactId>
++          <groupId>javax.servlet</groupId>
++        </exclusion>
++      </exclusions>
++    </dependency>
++    <dependency>
++      <groupId>org.apache.curator</groupId>
++      <artifactId>curator-framework</artifactId>
++      <version>2.4.0</version>
++      <scope>compile</scope>
++      <exclusions>
++        <exclusion>
++          <artifactId>log4j</artifactId>
++          <groupId>log4j</groupId>
++        </exclusion>
++        <exclusion>
++          <artifactId>slf4j-log4j12</artifactId>
++          <groupId>org.slf4j</groupId>
++        </exclusion>
++      </exclusions>
++    </dependency>
++    <dependency>
++      <groupId>com.googlecode.json-simple</groupId>
++      <artifactId>json-simple</artifactId>
++      <version>1.1</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>com.twitter</groupId>
++      <artifactId>carbonite</artifactId>
++      <version>1.4.0</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.yaml</groupId>
++      <artifactId>snakeyaml</artifactId>
++      <version>1.11</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.apache.httpcomponents</groupId>
++      <artifactId>httpclient</artifactId>
++      <version>4.3.3</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>com.googlecode.disruptor</groupId>
++      <artifactId>disruptor</artifactId>
++      <version>2.10.1</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.jgrapht</groupId>
++      <artifactId>jgrapht-core</artifactId>
++      <version>0.9.0</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>com.google.guava</groupId>
++      <artifactId>guava</artifactId>
++      <version>13.0</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>ch.qos.logback</groupId>
++      <artifactId>logback-classic</artifactId>
++      <version>1.0.6</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.slf4j</groupId>
++      <artifactId>log4j-over-slf4j</artifactId>
++      <version>1.6.6</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>io.netty</groupId>
++      <artifactId>netty</artifactId>
++      <version>3.6.3.Final</version>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.mockito</groupId>
++      <artifactId>mockito-all</artifactId>
++      <version>1.9.5</version>
++      <scope>test</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.clojars.runa</groupId>
++      <artifactId>conjure</artifactId>
++      <version>2.1.3</version>
++      <scope>test</scope>
++    </dependency>
++    <dependency>
++      <groupId>reply</groupId>
++      <artifactId>reply</artifactId>
++      <version>0.3.0</version>
++      <scope>provided</scope>
++      <exclusions>
++        <exclusion>
++          <artifactId>cd-client</artifactId>
++          <groupId>org.thnetos</groupId>
++        </exclusion>
++        <exclusion>
++          <artifactId>drawbridge</artifactId>
++          <groupId>com.cemerick</groupId>
++        </exclusion>
++        <exclusion>
++          <artifactId>versioneer</artifactId>
++          <groupId>trptcolin</groupId>
++        </exclusion>
++        <exclusion>
++          <artifactId>sjacket</artifactId>
++          <groupId>org.clojars.trptcolin</groupId>
++        </exclusion>
++      </exclusions>
++    </dependency>
++  </dependencies>
++</project>
++

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/storm-core/pom.xml
----------------------------------------------------------------------
diff --cc storm-core/pom.xml
index 440214d,134eeb8..91cd370
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@@ -20,7 -20,7 +20,11 @@@
      <parent>
          <artifactId>storm</artifactId>
          <groupId>org.apache.storm</groupId>
++<<<<<<< HEAD
 +        <version>0.9.2-incubating-mt0000</version>
++=======
+         <version>0.9.3-incubating-SNAPSHOT</version>
++>>>>>>> upstream/master
      </parent>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-core</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/storm-core/src/ui/public/topology.html
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/storm-dist/binary/pom.xml
----------------------------------------------------------------------
diff --cc storm-dist/binary/pom.xml
index bb1b876,0d97c0b..464ecd7
--- a/storm-dist/binary/pom.xml
+++ b/storm-dist/binary/pom.xml
@@@ -21,7 -21,7 +21,11 @@@
      <parent>
          <artifactId>storm</artifactId>
          <groupId>org.apache.storm</groupId>
++<<<<<<< HEAD
 +        <version>0.9.2-incubating-mt0000</version>
++=======
+         <version>0.9.3-incubating-SNAPSHOT</version>
++>>>>>>> upstream/master
          <relativePath>../../pom.xml</relativePath>
      </parent>
      <groupId>org.apache.storm</groupId>


[16/24] git commit: Merge remote-tracking branch 'upstream/master'

Posted by bo...@apache.org.
Merge remote-tracking branch 'upstream/master'


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/62f854a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/62f854a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/62f854a8

Branch: refs/heads/master
Commit: 62f854a81bc643181bc38b1e3dc6d1164a94d874
Parents: a4b26af d448e34
Author: JuDasheng <ju...@meituan.com>
Authored: Fri Jun 13 15:30:09 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Fri Jun 13 15:30:09 2014 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   3 +
 DEVELOPER.md                                    |   8 +
 storm-core/pom.xml                              |   7 +
 .../src/clj/backtype/storm/LocalCluster.clj     |  71 +-
 storm-core/src/clj/backtype/storm/LocalDRPC.clj |   5 +-
 storm-core/src/clj/backtype/storm/bootstrap.clj |   9 +-
 storm-core/src/clj/backtype/storm/clojure.clj   |  10 +-
 storm-core/src/clj/backtype/storm/cluster.clj   | 427 +++++------
 storm-core/src/clj/backtype/storm/config.clj    | 145 ++--
 .../src/clj/backtype/storm/daemon/drpc.clj      | 135 ++--
 storm-core/src/clj/backtype/storm/disruptor.clj |  80 +--
 storm-core/src/clj/backtype/storm/event.clj     |  49 +-
 storm-core/src/clj/backtype/storm/log.clj       |  22 +-
 .../clj/backtype/storm/process_simulator.clj    |  27 +-
 storm-core/src/clj/backtype/storm/stats.clj     | 289 ++++----
 storm-core/src/clj/backtype/storm/testing.clj   | 523 +++++++-------
 storm-core/src/clj/backtype/storm/testing4j.clj |  58 +-
 storm-core/src/clj/backtype/storm/thrift.clj    | 255 ++++---
 storm-core/src/clj/backtype/storm/timer.clj     | 106 +--
 storm-core/src/clj/backtype/storm/tuple.clj     |   7 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   | 559 ++++++++-------
 storm-core/src/clj/backtype/storm/util.clj      | 717 ++++++++++---------
 storm-core/src/clj/backtype/storm/zookeeper.clj | 158 ++--
 .../netty/NettyRenameThreadFactory.java         |   2 +-
 .../backtype/storm/task/TopologyContext.java    |  11 +
 .../backtype/storm/utils/DisruptorQueue.java    |  70 +-
 .../storm/utils/DisruptorQueueTest.java         | 153 ++++
 27 files changed, 2167 insertions(+), 1739 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/62f854a8/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------


[19/24] git commit: change log message in Could not find metric by name

Posted by bo...@apache.org.
change log message in Could not find metric by name


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/d854ecca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/d854ecca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/d854ecca

Branch: refs/heads/master
Commit: d854ecca6a259020aeb2a2acf0bba6f1089531aa
Parents: e33023c
Author: JuDasheng <ju...@meituan.com>
Authored: Thu Jun 19 10:43:18 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Thu Jun 19 10:43:18 2014 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/spout/ShellSpout.java | 2 +-
 storm-core/src/jvm/backtype/storm/task/ShellBolt.java   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/d854ecca/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index 9f4b38e..3f80147 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -103,7 +103,7 @@ public class ShellSpout implements ISpout {
         //get metric by name
         IMetric iMetric = _context.getRegisteredMetricByName(name);
         if (iMetric == null) {
-            throw new RuntimeException("Not find metric by name["+name+"] ");
+            throw new RuntimeException("Could not find metric by name["+name+"] ");
         }
         if ( !(iMetric instanceof IShellMetric)) {
             throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/d854ecca/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index b02a7fd..5ab327e 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -244,7 +244,7 @@ public class ShellBolt implements IBolt {
         //get metric by name
         IMetric iMetric = _context.getRegisteredMetricByName(name);
         if (iMetric == null) {
-            throw new RuntimeException("Not find metric by name["+name+"] ");
+            throw new RuntimeException("Could not find metric by name["+name+"] ");
         }
         if ( !(iMetric instanceof IShellMetric)) {
             throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");


[22/24] git commit: resolve the metrics_test hang problem with d2r's patch

Posted by bo...@apache.org.
resolve the metrics_test hang problem with d2r's patch


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/573c42a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/573c42a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/573c42a6

Branch: refs/heads/master
Commit: 573c42a64885dac9a6a0d4c69a754500b607a8f1
Parents: b004e06
Author: JuDasheng <ju...@meituan.com>
Authored: Tue Jul 1 22:30:06 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Tue Jul 1 22:30:06 2014 +0800

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/testing.clj | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/573c42a6/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index 32f7f88..1a5f53e 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -230,7 +230,10 @@
          (log-error t# "Error in cluster")
          (throw t#))
        (finally
-         (kill-local-storm-cluster ~cluster-sym)))))
+         (let [keep-waiting?# (atom true)]
+           (future (while @keep-waiting?# (simulate-wait ~cluster-sym)))
+           (kill-local-storm-cluster ~cluster-sym)
+           (reset! keep-waiting?# false))))))
 
 (defmacro with-simulated-time-local-cluster
   [& args]


[11/24] git commit: change version and modify for deploy

Posted by bo...@apache.org.
change version and modify for deploy


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/b3191fc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/b3191fc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/b3191fc5

Branch: refs/heads/master
Commit: b3191fc51cbe4f7fa442b27dd8e113159bc7804f
Parents: fbaf727
Author: JuDasheng <ju...@meituan.com>
Authored: Wed Jun 11 18:06:27 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Wed Jun 11 18:06:27 2014 +0800

----------------------------------------------------------------------
 build.sh                                        |  9 +++++++++
 examples/storm-starter/pom.xml                  |  2 +-
 external/storm-kafka/pom.xml                    |  2 +-
 pom.xml                                         | 20 +++++++++++++-------
 .../maven-shade-clojure-transformer/pom.xml     |  4 ++--
 storm-core/pom.xml                              |  2 +-
 6 files changed, 27 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b3191fc5/build.sh
----------------------------------------------------------------------
diff --git a/build.sh b/build.sh
new file mode 100755
index 0000000..2563c33
--- /dev/null
+++ b/build.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+mvn clean install -DskipTests=true 
+
+cd storm-dist/binary 
+
+mvn package
+
+cd -

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b3191fc5/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 11146b6..bc4b4b3 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -21,7 +21,7 @@
   <parent>
       <artifactId>storm</artifactId>
       <groupId>org.apache.storm</groupId>
-      <version>0.9.2-incubating-SNAPSHOT</version>
+      <version>0.9.2-incubating-mt0000</version>
       <relativePath>../../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b3191fc5/external/storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index 1210dcd..a8f718e 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.2-incubating-SNAPSHOT</version>
+        <version>0.9.2-incubating-mt0000</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b3191fc5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2c0cbc1..52fe369 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,16 +18,17 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
+    <!--
     <parent>
         <groupId>org.apache</groupId>
         <artifactId>apache</artifactId>
         <version>10</version>
     </parent>
-
+    -->
 
     <groupId>org.apache.storm</groupId>
     <artifactId>storm</artifactId>
-    <version>0.9.2-incubating-SNAPSHOT</version>
+    <version>0.9.2-incubating-mt0000</version>
     <packaging>pom</packaging>
     <name>Storm</name>
     <description>Distributed and fault-tolerant realtime computation</description>
@@ -254,11 +255,16 @@
     </profiles>
 
     <distributionManagement>
-        <site>
-            <id>storm.maven.website</id>
-            <name>Storm Website</name>
-            <url>file:///tmp/site</url>
-        </site>
+        <repository>
+            <id>nexus-releases</id>
+            <name>nexus-releases</name>
+            <url>http://nexus.sankuai.com:8081/nexus/content/repositories/releases</url>
+        </repository>
+        <snapshotRepository>
+            <id>nexus-snapshots</id>
+            <name>nexus-snapshots</name>
+            <url>http://nexus.sankuai.com:8081/nexus/content/repositories/snapshots</url>
+        </snapshotRepository>
     </distributionManagement>
 
     <dependencyManagement>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b3191fc5/storm-buildtools/maven-shade-clojure-transformer/pom.xml
----------------------------------------------------------------------
diff --git a/storm-buildtools/maven-shade-clojure-transformer/pom.xml b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
index 7dfa2a2..1c944d5 100644
--- a/storm-buildtools/maven-shade-clojure-transformer/pom.xml
+++ b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.2-incubating-SNAPSHOT</version>
+        <version>0.9.2-incubating-mt0000</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
@@ -36,4 +36,4 @@
             <scope>provided</scope>
         </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b3191fc5/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index fec6218..440214d 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -20,7 +20,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.2-incubating-SNAPSHOT</version>
+        <version>0.9.2-incubating-mt0000</version>
     </parent>
     <groupId>org.apache.storm</groupId>
     <artifactId>storm-core</artifactId>