You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/01 22:40:32 UTC
[01/24] git commit: add patch for new version multilang metric support
Repository: incubator-storm
Updated Branches:
refs/heads/master 18a0721d8 -> 8da9572ac
add patch for new version multilang metric support
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/7a21ae00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/7a21ae00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/7a21ae00
Branch: refs/heads/master
Commit: 7a21ae00d18967b9539d5febb9a04dabefb1c8de
Parents: 254ec13
Author: JuDasheng <ju...@meituan.com>
Authored: Wed Mar 5 14:22:02 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Wed Mar 5 14:22:02 2014 +0800
----------------------------------------------------------------------
.../metric/api/rpc/CombinedShellMetric.java | 33 +++++++++++
.../storm/metric/api/rpc/CountShellMetric.java | 38 ++++++++++++
.../storm/metric/api/rpc/IShellMetric.java | 33 +++++++++++
.../metric/api/rpc/ReducedShellMetric.java | 32 ++++++++++
.../jvm/backtype/storm/spout/ShellSpout.java | 61 ++++++++++++++++++++
.../src/jvm/backtype/storm/task/ShellBolt.java | 59 +++++++++++++++++++
.../backtype/storm/task/TopologyContext.java | 20 ++++++-
storm-core/src/multilang/py/storm.py | 3 +
8 files changed, 278 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7a21ae00/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
new file mode 100644
index 0000000..fd940a7
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.api.ICombiner;
+
+public class CombinedShellMetric extends CombinedMetric implements IShellMetric {
+
+ public CombinedShellMetric(ICombiner combiner) {
+ super(combiner);
+ }
+
+ public void updateMetricFromRPC(Object value) {
+ update(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7a21ae00/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
new file mode 100644
index 0000000..1779223
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.CountMetric;
+
+public class CountShellMetric extends CountMetric implements IShellMetric {
+ /***
+ * @param
+ * params should be null or long
+ * if value is null, it will call incr()
+ * if value is long, it will call incrBy((long)params)
+ * */
+ public void updateMetricFromRPC(Object value) {
+ if (value == null) {
+ incr();
+ } else if (value instanceof Long) {
+ incrBy((Long)value);
+ } else {
+ throw new RuntimeException("CountShellMetric updateMetricFromRPC params should be null or Long");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7a21ae00/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
new file mode 100644
index 0000000..9bec3a1
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.IMetric;
+
+public interface IShellMetric extends IMetric {
+ public static final String SHELL_METRICS_UPDATE_METHOD_NAME = "updateMetricFromRPC";
+
+ /***
+ * @function
+ * This interface is used by ShellBolt and ShellSpout through RPC call to update Metric
+ * @param
+ * value used to update metric, its's meaning change according implementation
+ * Object can be any json support types: String, Long, Double, Boolean, Null, List, Map
+ * */
+ public void updateMetricFromRPC(Object value);
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7a21ae00/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
new file mode 100644
index 0000000..727a709
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.IReducer;
+import backtype.storm.metric.api.ReducedMetric;
+
+public class ReducedShellMetric extends ReducedMetric implements IShellMetric {
+
+ public ReducedShellMetric(IReducer reducer) {
+ super(reducer);
+ }
+
+ public void updateMetricFromRPC(Object value) {
+ update(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7a21ae00/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index 67cb66f..e0abe8a 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -18,12 +18,17 @@
package backtype.storm.spout;
import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.rpc.IShellMetric;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.ShellProcess;
import backtype.storm.utils.Utils;
import java.util.Map;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.json.simple.JSONObject;
@@ -35,6 +40,8 @@ public class ShellSpout implements ISpout {
private SpoutOutputCollector _collector;
private String[] _command;
private ShellProcess _process;
+
+ private Map<String, IShellMetric> _registeredShellMetrics = new ConcurrentHashMap<String, IShellMetric>();
public ShellSpout(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
@@ -92,6 +99,49 @@ public class ShellSpout implements ISpout {
_fail.put("id", msgId);
querySubprocess(_fail);
}
+
+ private void handleMetrics(Map action) {
+ //get metrics
+ Object nameObj = action.get("name");
+ if ( !(nameObj instanceof String) ) {
+ throw new RuntimeException("Receive Metrics name is not String");
+ }
+ String name = (String) nameObj;
+ if (name == null || name.isEmpty()) {
+ throw new RuntimeException("Receive Metrics name is NULL");
+ }
+ if ( !_registeredShellMetrics.containsKey(name)) {
+ throw new RuntimeException("Receive Metrics name:" + name + " does not reigster.");
+ }
+ IShellMetric iMetric = _registeredShellMetrics.get(name);
+
+ //get paramList
+ Object paramsObj = action.get("params");
+
+ Class<? extends IShellMetric> oriClass = iMetric.getClass();
+ Method method = null;
+ try {
+ method = oriClass.getMethod(IShellMetric.SHELL_METRICS_UPDATE_METHOD_NAME, new Class[]{Object.class});
+ } catch (SecurityException e) {
+ LOG.error("handleMetrics get method ["+name+"] SecurityException");
+ throw new RuntimeException(e);
+ } catch (NoSuchMethodException e) {
+ LOG.error("handleMetrics get method ["+name+"] NoSuchMethodException");
+ throw new RuntimeException(e);
+ }
+ try {
+ method.invoke(iMetric, paramsObj);
+ } catch (IllegalArgumentException e) {
+ LOG.error("handleMetrics invoke["+name+"] IllegalArgumentException");
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ LOG.error("handleMetrics invoke["+name+"] IllegalAccessException");
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ LOG.error("handleMetrics invoke["+name+"] InvocationTargetException");
+ throw new RuntimeException(e);
+ }
+ }
private void querySubprocess(Object query) {
try {
@@ -120,12 +170,23 @@ public class ShellSpout implements ISpout {
} else {
_collector.emitDirect((int)task.longValue(), stream, tuple, messageId);
}
+ } else if (command.equals("metrics")) {
+ handleMetrics(action);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
+
+ public <T extends IShellMetric> T registerMetric(String name, T metric) {
+ if ( _registeredShellMetrics.containsKey(name) ) {
+ throw new RuntimeException("The same metric name `" + name + "` was registered in ShellSpout twice." );
+ } else {
+ _registeredShellMetrics.put(name, metric);
+ }
+ return metric;
+ }
@Override
public void activate() {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7a21ae00/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 118e90e..e6cbe5e 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -19,11 +19,14 @@ package backtype.storm.task;
import backtype.storm.Config;
import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.rpc.IShellMetric;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;
import backtype.storm.utils.ShellProcess;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
@@ -79,6 +82,8 @@ public class ShellBolt implements IBolt {
private Thread _readerThread;
private Thread _writerThread;
+ private Map<String, IShellMetric> _registeredShellMetrics = new ConcurrentHashMap<String, IShellMetric>();
+
public ShellBolt(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
}
@@ -127,6 +132,8 @@ public class ShellBolt implements IBolt {
LOG.info("Shell msg: " + msg);
} else if (command.equals("emit")) {
handleEmit(action);
+ } else if (command.equals("metrics")) {
+ handleMetrics(action);
}
} catch (InterruptedException e) {
} catch (Throwable t) {
@@ -186,6 +193,15 @@ public class ShellBolt implements IBolt {
_inputs.clear();
}
+ public <T extends IShellMetric> T registerMetric(String name, T metric) {
+ if ( _registeredShellMetrics.containsKey(name) ) {
+ throw new RuntimeException("The same metric name `" + name + "` was registered in ShellBolt twice." );
+ } else {
+ _registeredShellMetrics.put(name, metric);
+ }
+ return metric;
+ }
+
private void handleAck(Map action) {
String id = (String) action.get("id");
Tuple acked = _inputs.remove(id);
@@ -238,6 +254,49 @@ public class ShellBolt implements IBolt {
_collector.emitDirect((int)task.longValue(), stream, anchors, tuple);
}
}
+
+ private void handleMetrics(Map action) {
+ //get metrics
+ Object nameObj = action.get("name");
+ if ( !(nameObj instanceof String) ) {
+ throw new RuntimeException("Receive Metrics name is not String");
+ }
+ String name = (String) nameObj;
+ if (name == null || name.isEmpty()) {
+ throw new RuntimeException("Receive Metrics name is NULL");
+ }
+ if ( !_registeredShellMetrics.containsKey(name)) {
+ throw new RuntimeException("Receive Metrics name:" + name + " does not reigster.");
+ }
+ IShellMetric iMetric = _registeredShellMetrics.get(name);
+
+ //get paramList
+ Object paramsObj = action.get("params");
+
+ Class<? extends IShellMetric> oriClass = iMetric.getClass();
+ Method method = null;
+ try {
+ method = oriClass.getMethod(IShellMetric.SHELL_METRICS_UPDATE_METHOD_NAME, new Class[]{Object.class});
+ } catch (SecurityException e) {
+ LOG.error("handleMetrics get method ["+name+"] SecurityException");
+ throw new RuntimeException(e);
+ } catch (NoSuchMethodException e) {
+ LOG.error("handleMetrics get method ["+name+"] NoSuchMethodException");
+ throw new RuntimeException(e);
+ }
+ try {
+ method.invoke(iMetric, paramsObj);
+ } catch (IllegalArgumentException e) {
+ LOG.error("handleMetrics invoke["+name+"] IllegalArgumentException");
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ LOG.error("handleMetrics invoke["+name+"] IllegalAccessException");
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ LOG.error("handleMetrics invoke["+name+"] InvocationTargetException");
+ throw new RuntimeException(e);
+ }
+ }
private void die(Throwable exception) {
_exception = exception;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7a21ae00/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index c9df979..1285739 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -26,6 +26,8 @@ import backtype.storm.metric.api.IReducer;
import backtype.storm.metric.api.ICombiner;
import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.api.rpc.IShellMetric;
+import backtype.storm.spout.ShellSpout;
import backtype.storm.state.ISubscribedState;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
@@ -250,6 +252,22 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
return metric;
}
+
+ /*
+ * Convinience method for ShellBolt to registering ShellMetric.
+ */
+ public <T extends IShellMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs, ShellBolt bolt) {
+ bolt.registerMetric(name, metric);
+ return registerMetric(name, metric, timeBucketSizeInSecs);
+ }
+
+ /*
+ * Convinience method for ShellSpout to registering ShellMetric.
+ */
+ public <T extends IShellMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs, ShellSpout spout) {
+ spout.registerMetric(name, metric);
+ return registerMetric(name, metric, timeBucketSizeInSecs);
+ }
/*
* Convinience method for registering ReducedMetric.
@@ -263,4 +281,4 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7a21ae00/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
index bec3f0c..c0387b4 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -135,6 +135,9 @@ def reportError(msg):
def log(msg):
sendMsgToParent({"command": "log", "msg": msg})
+
+def rpcMetrics(name, params):
+ sendMsgToParent({"command": "metrics", "name": name, "params": params})
def initComponent():
setupInfo = readMsg()
[23/24] git commit: Merge branch 'master' of
https://github.com/dashengju/incubator-storm into STORM-200
Posted by bo...@apache.org.
Merge branch 'master' of https://github.com/dashengju/incubator-storm into STORM-200
STORM-200: Proposal for Multilang's Metrics feature
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ff345c1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ff345c1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ff345c1f
Branch: refs/heads/master
Commit: ff345c1fa9dcbe55e96037bec3b59d06c3f64cd4
Parents: 18a0721 573c42a
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Tue Jul 1 15:23:36 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Tue Jul 1 15:23:36 2014 -0500
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/testing.clj | 5 +-
.../src/dev/resources/tester_bolt_metrics.py | 35 ++++
.../src/dev/resources/tester_spout_metrics.py | 51 +++++
.../metric/api/rpc/AssignableShellMetric.java | 30 +++
.../metric/api/rpc/CombinedShellMetric.java | 31 +++
.../storm/metric/api/rpc/CountShellMetric.java | 38 ++++
.../storm/metric/api/rpc/IShellMetric.java | 31 +++
.../metric/api/rpc/ReducedShellMetric.java | 32 +++
.../storm/multilang/JsonSerializer.java | 10 +
.../jvm/backtype/storm/multilang/ShellMsg.java | 20 ++
.../jvm/backtype/storm/spout/ShellSpout.java | 37 ++++
.../src/jvm/backtype/storm/task/ShellBolt.java | 38 ++++
.../backtype/storm/task/TopologyContext.java | 24 +++
.../storm/testing/PythonShellMetricsBolt.java | 32 +++
.../storm/testing/PythonShellMetricsSpout.java | 35 ++++
storm-core/src/multilang/py/storm.py | 3 +
.../test/clj/backtype/storm/metrics_test.clj | 206 ++++++++++++-------
17 files changed, 588 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ff345c1f/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ff345c1f/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ff345c1f/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 532545f,5ab327e..430581d
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@@ -115,9 -123,11 +123,11 @@@ public class ShellBolt implements IBol
handleError(shellMsg.getMsg());
} else if (command.equals("log")) {
String msg = shellMsg.getMsg();
- LOG.info("Shell msg: " + msg);
+ LOG.info("Shell msg: " + msg + _process.getProcessInfoString());
} else if (command.equals("emit")) {
handleEmit(shellMsg);
+ } else if (command.equals("metrics")) {
+ handleMetrics(shellMsg);
}
} catch (InterruptedException e) {
} catch (Throwable t) {
@@@ -224,10 -233,36 +234,38 @@@
shellMsg.getStream(), anchors, shellMsg.getTuple());
}
}
+
+ private void handleMetrics(ShellMsg shellMsg) {
+ //get metric name
+ String name = shellMsg.getMetricName();
+ if (name.isEmpty()) {
+ throw new RuntimeException("Receive Metrics name is empty");
+ }
+
+ //get metric by name
+ IMetric iMetric = _context.getRegisteredMetricByName(name);
+ if (iMetric == null) {
+ throw new RuntimeException("Could not find metric by name["+name+"] ");
+ }
+ if ( !(iMetric instanceof IShellMetric)) {
+ throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+ }
+ IShellMetric iShellMetric = (IShellMetric)iMetric;
+
+ //call updateMetricFromRPC with params
+ Object paramsObj = shellMsg.getMetricParams();
+ try {
+ iShellMetric.updateMetricFromRPC(paramsObj);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
private void die(Throwable exception) {
- _exception = exception;
+ String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
+ _exception = new RuntimeException(processInfo, exception);
}
+
}
[05/24] git commit: remove no use import
Posted by bo...@apache.org.
remove no use import
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/db709fe3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/db709fe3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/db709fe3
Branch: refs/heads/master
Commit: db709fe3eeabd98e5b44d3f5d84c1b5cfe74445a
Parents: 98a2720
Author: JuDasheng <ju...@meituan.com>
Authored: Thu Mar 6 17:13:55 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Thu Mar 6 17:13:55 2014 +0800
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/spout/ShellSpout.java | 3 ---
storm-core/src/jvm/backtype/storm/task/ShellBolt.java | 2 --
2 files changed, 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/db709fe3/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index c6443e4..f54cf72 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -25,10 +25,7 @@ import backtype.storm.utils.ShellProcess;
import backtype.storm.utils.Utils;
import java.util.Map;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/db709fe3/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index b2bdc22..de3c3f2 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -26,8 +26,6 @@ import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;
import backtype.storm.utils.ShellProcess;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
[03/24] git commit: add AssignableShellMetric.java to rpc
Posted by bo...@apache.org.
add AssignableShellMetric.java to rpc
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/5448955d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/5448955d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/5448955d
Branch: refs/heads/master
Commit: 5448955dba2ffee133b1bd8b4d144611fcdcc751
Parents: ecb39a7
Author: JuDasheng <ju...@meituan.com>
Authored: Thu Mar 6 17:03:09 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Thu Mar 6 17:03:09 2014 +0800
----------------------------------------------------------------------
.../storm/metric/api/rpc/AssignableShellMetric.java | 13 +++++++++++++
1 file changed, 13 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5448955d/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
new file mode 100644
index 0000000..e0722b8
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
@@ -0,0 +1,13 @@
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.AssignableMetric;
+
+public class AssignableShellMetric extends AssignableMetric implements IShellMetric {
+ public AssignableShellMetric(Object value) {
+ super(value);
+ }
+
+ public void updateMetricFromRPC(Object value) {
+ setValue(value);
+ }
+}
[21/24] git commit: Merge branch 'master' of
github.com:dashengju/incubator-storm
Posted by bo...@apache.org.
Merge branch 'master' of github.com:dashengju/incubator-storm
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/b004e064
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/b004e064
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/b004e064
Branch: refs/heads/master
Commit: b004e064706dc2ee2c90c54325b9f770c6c116ef
Parents: 4bab2b8 d854ecc
Author: dashengju <da...@qq.com>
Authored: Fri Jun 20 10:35:07 2014 +0800
Committer: dashengju <da...@qq.com>
Committed: Fri Jun 20 10:35:07 2014 +0800
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/spout/ShellSpout.java | 2 +-
storm-core/src/jvm/backtype/storm/task/ShellBolt.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[13/24] git commit: change log format
Posted by bo...@apache.org.
change log format
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/4d6a27f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/4d6a27f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/4d6a27f4
Branch: refs/heads/master
Commit: 4d6a27f466b942cd011efb0ffd538f0c04daf047
Parents: 0427e06
Author: JuDasheng <ju...@meituan.com>
Authored: Thu Jun 12 17:46:08 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Thu Jun 12 17:46:08 2014 +0800
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/executor.clj | 67 ++++++++++++++++----
.../src/clj/backtype/storm/daemon/task.clj | 4 +-
2 files changed, 55 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4d6a27f4/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 1bbe53d..64e60be 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -18,6 +18,7 @@
(:use [backtype.storm bootstrap])
(:import [backtype.storm.hooks ITaskHook])
(:import [backtype.storm.tuple Tuple])
+ (:import [backtype.storm.tuple MessageId])
(:import [backtype.storm.spout ISpoutWaitStrategy])
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
@@ -389,12 +390,25 @@
(let [^KryoTupleDeserializer deserializer (:deserializer executor-data)
task-ids (:task-ids executor-data)
debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
+ component-id (:component-id executor-data)
+ executor-id (:executor-id executor-data)
+ executor-type (:type executor-data)
]
(disruptor/clojure-handler
(fn [tuple-batch sequence-id end-of-batch?]
(fast-list-iter [[task-id msg] tuple-batch]
- (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))]
- (when debug? (log-message "Processing received message " tuple))
+ (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))
+ tuple-streamid (.getSourceStreamId tuple)
+ tuple-source (.getSourceComponent tuple)
+ tuple-id (.getMessageId tuple)
+ tuple-values (.getValues tuple)
+ ]
+ (when debug?
+ (if (= tuple-streamid "default")
+ (log-message "Component[" component-id "] Type[RECV] from Stream[" tuple-streamid "] Source[" tuple-source "] TupleId[" tuple-id "] TupleValue[" tuple-values "]")
+ (log-message "Component[" component-id "] Type[RECV] from Stream[" tuple-streamid "] Source[" tuple-source "] TupleId[" tuple-values "]")
+ )
+ )
(if task-id
(tuple-action-fn task-id tuple)
;; null task ids are broadcast tuples
@@ -421,6 +435,7 @@
last-active (atom false)
spouts (ArrayList. (map :object (vals task-datas)))
rand (Random. (Utils/secureRandomLong))
+ debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
pending (RotatingMap.
2 ;; microoptimize for performance of .size method
@@ -428,9 +443,12 @@
(expire [this msg-id [task-id spout-id tuple-info start-time-ms]]
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
(fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta)
+ (when debug?
+ (log-message "Component[" component-id "] FAILED-TUPLE reason[EXPIRED] TupleId[" msg-id "] values[" tuple-info "]"))
))))
tuple-action-fn (fn [task-id ^TupleImpl tuple]
- (let [stream-id (.getSourceStreamId tuple)]
+ (let [stream-id (.getSourceStreamId tuple)
+ tuple-id (.getMessageId tuple)]
(condp = stream-id
Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
@@ -441,10 +459,18 @@
(throw-runtime "Fatal error, mismatched task ids: " task-id " " stored-task-id))
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
(condp = stream-id
- ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
- spout-id tuple-finished-info time-delta)
- ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
- spout-id tuple-finished-info time-delta)
+ ACKER-ACK-STREAM-ID (do
+ (ack-spout-msg executor-data (get task-datas task-id)
+ spout-id tuple-finished-info time-delta)
+ (when debug?
+ (log-message "Component[" component-id "] ACK-TUPLE reason[RECV] TupleId[" id "] values[" tuple-finished-info "]"))
+ )
+ ACKER-FAIL-STREAM-ID (do
+ (fail-spout-msg executor-data (get task-datas task-id)
+ spout-id tuple-finished-info time-delta)
+ (when debug?
+ (log-message "Component[" component-id "] FAILED-TUPLE reason[RECV] TupleId[" id "] values[" tuple-finished-info "]"))
+ )
)))
;; TODO: on failure, emit tuple to failure stream
))))
@@ -493,6 +519,8 @@
(transfer-fn out-task
out-tuple
overflow-buffer)
+ (when debug?
+ (log-message "Component[" component-id "] Type[EMIT] to Stream[" out-stream-id "] TupleId[" tuple-id "] values[" values "]"))
))
(if rooted?
(do
@@ -598,6 +626,8 @@
{:keys [storm-conf component-id worker-context transfer-fn report-error sampler
open-or-prepare-was-called?]} executor-data
rand (Random. (Utils/secureRandomLong))
+ debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
+
tuple-action-fn (fn [task-id ^TupleImpl tuple]
;; synchronization needs to be done with a key provided by this bolt, otherwise:
;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
@@ -660,7 +690,13 @@
(tasks-fn task stream values)
(tasks-fn stream values))]
(fast-list-iter [t out-tasks]
- (let [anchors-to-ids (HashMap.)]
+ (let [anchors-to-ids (HashMap.)
+ out-tuple (TupleImpl. worker-context
+ values
+ task-id
+ stream
+ (MessageId/makeId anchors-to-ids))
+ ]
(fast-list-iter [^TupleImpl a anchors]
(let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
(when (pos? (count root-ids))
@@ -669,12 +705,15 @@
(fast-list-iter [root-id root-ids]
(put-xor! anchors-to-ids root-id edge-id))
))))
- (transfer-fn t
- (TupleImpl. worker-context
- values
- task-id
- stream
- (MessageId/makeId anchors-to-ids)))))
+ (transfer-fn t out-tuple)
+ (when debug?
+ (if (= component-id "__acker")
+ (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" (.get values 0) "]")
+ (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" (.getMessageId out-tuple) "] values[" values "]")
+ )
+
+ )
+ ))
(or out-tasks [])))]]
(builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
(if (= component-id Constants/SYSTEM_COMPONENT_ID)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4d6a27f4/storm-core/src/clj/backtype/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/task.clj b/storm-core/src/clj/backtype/storm/daemon/task.clj
index 3650150..29756a1 100644
--- a/storm-core/src/clj/backtype/storm/daemon/task.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/task.clj
@@ -131,7 +131,7 @@
(fn ([^Integer out-task-id ^String stream ^List values]
(when debug?
- (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
+ (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" values "]"))
(let [target-component (.getComponentId worker-context out-task-id)
component->grouping (get stream->component->grouper stream)
grouping (get component->grouping target-component)
@@ -149,7 +149,7 @@
))
([^String stream ^List values]
(when debug?
- (log-message "Emitting: " component-id " " stream " " values))
+ (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" values "]"))
(let [out-tasks (ArrayList.)]
(fast-map-iter [[out-component grouper] (get stream->component->grouper stream)]
(when (= :direct grouper)
[20/24] git commit: change metrics_test use
simulated-time-local-cluster and modify the hang problem with assert-buckets
Posted by bo...@apache.org.
change metrics_test use simulated-time-local-cluster and modify the hang problem with assert-buckets
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/4bab2b85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/4bab2b85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/4bab2b85
Branch: refs/heads/master
Commit: 4bab2b857d0d481d1859336adeb87d7245c6e1c3
Parents: ecd0adb
Author: dashengju <da...@qq.com>
Authored: Fri Jun 20 10:34:38 2014 +0800
Committer: dashengju <da...@qq.com>
Committed: Fri Jun 20 10:34:38 2014 +0800
----------------------------------------------------------------------
.../src/dev/resources/tester_spout_metrics.py | 12 +-
.../test/clj/backtype/storm/metrics_test.clj | 156 ++++++++++---------
2 files changed, 84 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4bab2b85/storm-core/src/dev/resources/tester_spout_metrics.py
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/tester_spout_metrics.py b/storm-core/src/dev/resources/tester_spout_metrics.py
index a91128a..0480566 100644
--- a/storm-core/src/dev/resources/tester_spout_metrics.py
+++ b/storm-core/src/dev/resources/tester_spout_metrics.py
@@ -32,16 +32,14 @@ class TesterSpout(storm.Spout):
self.count = 0
def nextTuple(self):
- sleep(1)
- storm.log("TesterSpout emit a tuple")
- word = choice(words)
- id = str(uuid4())
- self.pending[id] = word
if self.count < 2:
+ word = choice(words)
+ id = str(uuid4())
+ self.pending[id] = word
storm.rpcMetrics("my-custom-shellspout-metric", 1)
- storm.log("TesterSpout update my-custom-shellspout-metric")
self.count = self.count + 1
- storm.emit([word], id=id)
+ storm.log("TesterSpout update my-custom-shellspout-metric "+str(self.count))
+ storm.emit([word], id=id)
def ack(self, id):
del self.pending[id]
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4bab2b85/storm-core/test/clj/backtype/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/metrics_test.clj b/storm-core/test/clj/backtype/storm/metrics_test.clj
index 4aaa3dc..4356d30 100644
--- a/storm-core/test/clj/backtype/storm/metrics_test.clj
+++ b/storm-core/test/clj/backtype/storm/metrics_test.clj
@@ -70,15 +70,17 @@
(def metrics-data backtype.storm.metric.testing/buffer)
-(defn wait-for-atleast-N-buckets! [N comp-id metric-name]
- (while
+(defn wait-for-atleast-N-buckets! [N comp-id metric-name cluster]
+ (while-timeout TEST-TIMEOUT-MS
(let [taskid->buckets (-> @metrics-data (get comp-id) (get metric-name))]
(or
(and (not= N 0) (nil? taskid->buckets))
(not-every? #(<= N %) (map (comp count second) taskid->buckets))))
-;; (println "Waiting for at least" N "timebuckets to appear in FakeMetricsConsumer for component id" comp-id
-;; "and metric name" metric-name)
- (Thread/sleep 10)))
+ ;;(log-message "Waiting for at least " N " timebuckets to appear in FakeMetricsConsumer for component id " comp-id " and metric name " metric-name " metrics " (-> @metrics-data (get comp-id) (get metric-name)))
+ (if cluster
+ (advance-cluster-time cluster 1)
+ (Thread/sleep 10))))
+
(defn lookup-bucket-by-comp-id-&-metric-name! [comp-id metric-name]
(-> @metrics-data
@@ -88,10 +90,10 @@
(second)
(or [])))
-(defmacro assert-buckets! [comp-id metric-name expected]
+(defmacro assert-buckets! [comp-id metric-name expected cluster]
`(do
(let [N# (count ~expected)]
- (wait-for-atleast-N-buckets! N# ~comp-id ~metric-name)
+ (wait-for-atleast-N-buckets! N# ~comp-id ~metric-name ~cluster)
(is (= ~expected (subvec (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name) 0 N#))))))
(defmacro assert-metric-data-exists! [comp-id metric-name]
@@ -112,18 +114,18 @@
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 6)
- (assert-buckets! "2" "my-custom-metric" [1])
+ (assert-buckets! "2" "my-custom-metric" [1] cluster)
(advance-cluster-time cluster 5)
- (assert-buckets! "2" "my-custom-metric" [1 0])
+ (assert-buckets! "2" "my-custom-metric" [1 0] cluster)
(advance-cluster-time cluster 20)
- (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0])
+ (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0] cluster)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
(advance-cluster-time cluster 5)
- (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
+ (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
(deftest test-custom-metric-with-multi-tasks
(with-simulated-time-local-cluster
@@ -140,18 +142,18 @@
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 6)
- (assert-buckets! "2" "my-custom-metric" [1])
+ (assert-buckets! "2" "my-custom-metric" [1] cluster)
(advance-cluster-time cluster 5)
- (assert-buckets! "2" "my-custom-metric" [1 0])
+ (assert-buckets! "2" "my-custom-metric" [1 0] cluster)
(advance-cluster-time cluster 20)
- (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0])
+ (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0] cluster)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
(advance-cluster-time cluster 5)
- (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
+ (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
(defn mk-shell-bolt-with-metrics-spec
[inputs command & kwargs]
@@ -160,7 +162,7 @@
(PythonShellMetricsBolt. command) kwargs)))
(deftest test-custom-metric-with-multilang-py
- (with-local-cluster
+ (with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
"storm.zookeeper.connection.timeout" 30000
@@ -173,19 +175,19 @@
(submit-local-topology (:nimbus cluster) "shell-metrics-tester" {} topology)
(.feed feeder ["a"] 1)
- (Thread/sleep 6000)
- (assert-buckets! "2" "my-custom-shell-metric" [1])
+ (advance-cluster-time cluster 6)
+ (assert-buckets! "2" "my-custom-shell-metric" [1] cluster)
- (Thread/sleep 5000)
- (assert-buckets! "2" "my-custom-shell-metric" [1 0])
+ (advance-cluster-time cluster 5)
+ (assert-buckets! "2" "my-custom-shell-metric" [1 0] cluster)
- (Thread/sleep 20000)
- (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0])
+ (advance-cluster-time cluster 20)
+ (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0] cluster)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
- (Thread/sleep 5000)
- (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0 2])
+ (advance-cluster-time cluster 5)
+ (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0 2] cluster)
)))
(defn mk-shell-spout-with-metrics-spec
@@ -194,7 +196,7 @@
(apply thrift/mk-spout-spec (PythonShellMetricsSpout. command) kwargs)))
(deftest test-custom-metric-with-spout-multilang-py
- (with-local-cluster
+ (with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
"storm.zookeeper.connection.timeout" 30000
@@ -204,8 +206,8 @@
{"2" (thrift/mk-bolt-spec {"1" :all} count-acks)})]
(submit-local-topology (:nimbus cluster) "shell-spout-metrics-tester" {} topology)
- (Thread/sleep 7000)
- (assert-buckets! "1" "my-custom-shellspout-metric" [2])
+ (advance-cluster-time cluster 7)
+ (assert-buckets! "1" "my-custom-shellspout-metric" [2] cluster)
)))
@@ -223,27 +225,27 @@
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 61)
- (assert-buckets! "myspout" "__ack-count/default" [1])
- (assert-buckets! "myspout" "__emit-count/default" [1])
- (assert-buckets! "myspout" "__transfer-count/default" [1])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1])
+ (assert-buckets! "myspout" "__ack-count/default" [1] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [1] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [1] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
(advance-cluster-time cluster 120)
- (assert-buckets! "myspout" "__ack-count/default" [1 0 0])
- (assert-buckets! "myspout" "__emit-count/default" [1 0 0])
- (assert-buckets! "myspout" "__transfer-count/default" [1 0 0])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0])
+ (assert-buckets! "myspout" "__ack-count/default" [1 0 0] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [1 0 0] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [1 0 0] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0] cluster)
(.feed feeder ["b"] 1)
(.feed feeder ["c"] 1)
(advance-cluster-time cluster 60)
- (assert-buckets! "myspout" "__ack-count/default" [1 0 0 2])
- (assert-buckets! "myspout" "__emit-count/default" [1 0 0 2])
- (assert-buckets! "myspout" "__transfer-count/default" [1 0 0 2])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 2])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0 2]))))
+ (assert-buckets! "myspout" "__ack-count/default" [1 0 0 2] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [1 0 0 2] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [1 0 0 2] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 2] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0 2] cluster))))
(deftest test-builtin-metrics-2
@@ -266,36 +268,36 @@
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 6)
(assert-acked tracker 1)
- (assert-buckets! "myspout" "__fail-count/default" [])
- (assert-buckets! "myspout" "__ack-count/default" [1])
- (assert-buckets! "myspout" "__emit-count/default" [1])
- (assert-buckets! "myspout" "__transfer-count/default" [1])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1])
+ (assert-buckets! "myspout" "__fail-count/default" [] cluster)
+ (assert-buckets! "myspout" "__ack-count/default" [1] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [1] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [1] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
(.feed feeder ["b"] 2)
(advance-cluster-time cluster 5)
- (assert-buckets! "myspout" "__fail-count/default" [])
- (assert-buckets! "myspout" "__ack-count/default" [1 0])
- (assert-buckets! "myspout" "__emit-count/default" [1 1])
- (assert-buckets! "myspout" "__transfer-count/default" [1 1])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1])
+ (assert-buckets! "myspout" "__fail-count/default" [] cluster)
+ (assert-buckets! "myspout" "__ack-count/default" [1 0] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [1 1] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [1 1] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1] cluster)
(advance-cluster-time cluster 15)
- (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0])
- (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0])
- (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0])
+ (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0] cluster)
(.feed feeder ["c"] 3)
(advance-cluster-time cluster 15)
- (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0 1 0 0])
- (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0 1 0 0])
- (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0 1 0 0])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0 1 0 0])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0 1 0 0]))))
+ (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0 1 0 0] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0 1 0 0] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0 1 0 0] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0 1 0 0] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0 1 0 0] cluster))))
(deftest test-builtin-metrics-3
(with-simulated-time-local-cluster
@@ -319,21 +321,21 @@
(.feed feeder ["c"] 3)
(advance-cluster-time cluster 9)
(assert-acked tracker 1 3)
- (assert-buckets! "myspout" "__ack-count/default" [2])
- (assert-buckets! "myspout" "__emit-count/default" [3])
- (assert-buckets! "myspout" "__transfer-count/default" [3])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [2])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [3])
+ (assert-buckets! "myspout" "__ack-count/default" [2] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [3] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [3] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [2] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [3] cluster)
(is (not (.isFailed tracker 2)))
(advance-cluster-time cluster 30)
(assert-failed tracker 2)
- (assert-buckets! "myspout" "__fail-count/default" [1])
- (assert-buckets! "myspout" "__ack-count/default" [2 0 0 0])
- (assert-buckets! "myspout" "__emit-count/default" [3 0 0 0])
- (assert-buckets! "myspout" "__transfer-count/default" [3 0 0 0])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [2 0 0 0])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [3 0 0 0]))))
+ (assert-buckets! "myspout" "__fail-count/default" [1] cluster)
+ (assert-buckets! "myspout" "__ack-count/default" [2 0 0 0] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [3 0 0 0] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [3 0 0 0] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [2 0 0 0] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [3 0 0 0] cluster))))
(deftest test-system-bolt
(with-simulated-time-local-cluster
@@ -348,12 +350,12 @@
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 70)
- (assert-buckets! "__system" "newWorkerEvent" [1])
+ (assert-buckets! "__system" "newWorkerEvent" [1] cluster)
(assert-metric-data-exists! "__system" "uptimeSecs")
(assert-metric-data-exists! "__system" "startTimeSecs")
(advance-cluster-time cluster 180)
- (assert-buckets! "__system" "newWorkerEvent" [1 0 0 0])
+ (assert-buckets! "__system" "newWorkerEvent" [1 0 0 0] cluster)
)))
[06/24] git commit: remove TopologyContext.java unused import
Posted by bo...@apache.org.
remove TopologyContext.java unused import
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/e2a7f7c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/e2a7f7c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/e2a7f7c0
Branch: refs/heads/master
Commit: e2a7f7c0e1a5a413840948342f4be86d9ba56516
Parents: db709fe
Author: dashengju <da...@qq.com>
Authored: Fri Mar 7 09:48:58 2014 +0800
Committer: dashengju <da...@qq.com>
Committed: Fri Mar 7 09:48:58 2014 +0800
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/task/TopologyContext.java | 2 --
1 file changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e2a7f7c0/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index c0fcf20..dfbad0c 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -26,8 +26,6 @@ import backtype.storm.metric.api.IReducer;
import backtype.storm.metric.api.ICombiner;
import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.rpc.IShellMetric;
-import backtype.storm.spout.ShellSpout;
import backtype.storm.state.ISubscribedState;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
[12/24] git commit: remove ui topology action
Posted by bo...@apache.org.
remove ui topology action
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/0427e06e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/0427e06e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/0427e06e
Branch: refs/heads/master
Commit: 0427e06e52776b1f4e4ad299d198ee7a1384bae6
Parents: b3191fc
Author: JuDasheng <ju...@meituan.com>
Authored: Wed Jun 11 23:00:02 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Wed Jun 11 23:00:02 2014 +0800
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/ui/core.clj | 74 +++++++++++-----------
storm-core/src/ui/public/topology.html | 6 +-
storm-dist/binary/pom.xml | 2 +-
3 files changed, 42 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0427e06e/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 5f2bcba..4098038 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -780,43 +780,43 @@
(let [id (url-decode id)
component (url-decode component)]
(json-response (component-page id component (:window m) (check-include-sys? (:sys m))))))
- (POST "/api/v1/topology/:id/activate" [id]
- (with-nimbus nimbus
- (let [id (url-decode id)
- tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
- name (.get_name tplg)]
- (.activate nimbus name)
- (log-message "Activating topology '" name "'")))
- (resp/redirect (str "/api/v1/topology/" id)))
-
- (POST "/api/v1/topology/:id/deactivate" [id]
- (with-nimbus nimbus
- (let [id (url-decode id)
- tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
- name (.get_name tplg)]
- (.deactivate nimbus name)
- (log-message "Deactivating topology '" name "'")))
- (resp/redirect (str "/api/v1/topology/" id)))
- (POST "/api/v1/topology/:id/rebalance/:wait-time" [id wait-time]
- (with-nimbus nimbus
- (let [id (url-decode id)
- tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
- name (.get_name tplg)
- options (RebalanceOptions.)]
- (.set_wait_secs options (Integer/parseInt wait-time))
- (.rebalance nimbus name options)
- (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
- (resp/redirect (str "/api/v1/topology/" id)))
- (POST "/api/v1/topology/:id/kill/:wait-time" [id wait-time]
- (with-nimbus nimbus
- (let [id (url-decode id)
- tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
- name (.get_name tplg)
- options (KillOptions.)]
- (.set_wait_secs options (Integer/parseInt wait-time))
- (.killTopologyWithOpts nimbus name options)
- (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
- (resp/redirect (str "/api/v1/topology/" id)))
+ ;(POST "/api/v1/topology/:id/activate" [id]
+ ; (with-nimbus nimbus
+ ; (let [id (url-decode id)
+ ; tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ ; name (.get_name tplg)]
+ ; (.activate nimbus name)
+ ; (log-message "Activating topology '" name "'")))
+ ; (resp/redirect (str "/api/v1/topology/" id)))
+
+ ;(POST "/api/v1/topology/:id/deactivate" [id]
+ ; (with-nimbus nimbus
+ ; (let [id (url-decode id)
+ ; tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ ; name (.get_name tplg)]
+ ; (.deactivate nimbus name)
+ ; (log-message "Deactivating topology '" name "'")))
+ ; (resp/redirect (str "/api/v1/topology/" id)))
+ ;(POST "/api/v1/topology/:id/rebalance/:wait-time" [id wait-time]
+ ; (with-nimbus nimbus
+ ; (let [id (url-decode id)
+ ; tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ ; name (.get_name tplg)
+ ; options (RebalanceOptions.)]
+ ; (.set_wait_secs options (Integer/parseInt wait-time))
+ ; (.rebalance nimbus name options)
+ ; (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
+ ; (resp/redirect (str "/api/v1/topology/" id)))
+ ;(POST "/api/v1/topology/:id/kill/:wait-time" [id wait-time]
+ ; (with-nimbus nimbus
+ ; (let [id (url-decode id)
+ ; tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ ; name (.get_name tplg)
+ ; options (KillOptions.)]
+ ; (.set_wait_secs options (Integer/parseInt wait-time))
+ ; (.killTopologyWithOpts nimbus name options)
+ ; (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
+ ; (resp/redirect (str "/api/v1/topology/" id)))
(GET "/" [:as {cookies :cookies}]
(resp/redirect "/index.html"))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0427e06e/storm-core/src/ui/public/topology.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html
index df095ad..9cb08cd 100644
--- a/storm-core/src/ui/public/topology.html
+++ b/storm-core/src/ui/public/topology.html
@@ -19,11 +19,13 @@
<h2>Topology summary</h2>
<div id="topology-summary">
</div>
+<!--
<div id="topology-actions">
<h2 class="js-only">Topology actions</h2>
<p id="topology-actions" class="js-only">
</p>
</div>
+-->
<div id="topology-stats"></div>
<div id="spout-stats">
</div>
@@ -60,13 +62,13 @@ $(document).ready(function() {
var spoutStats = $("#spout-stats");
var boltStats = $("#bolt-stats");
var config = $("#topology-configuration");
- var topologyActions = $("#topology-actions");
+ //var topologyActions = $("#topology-actions");
var topologyVisualization = $("#topology-visualization")
var formattedConfig = formatConfigData(response["configuration"]);
var buttonJsonData = topologyActionJson(response["id"],response["name"],response["status"],response["msgTimeout"]);
$.get("/templates/topology-page-template.html", function(template) {
topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response));
- topologyActions.append(Mustache.render($(template).filter("#topology-actions-template").html(),buttonJsonData));
+ //topologyActions.append(Mustache.render($(template).filter("#topology-actions-template").html(),buttonJsonData));
topologyStats.append(Mustache.render($(template).filter("#topology-stats-template").html(),response));
$("#topology-stats-table").tablesorter({ sortList: [[0,0]], headers: {0: { sorter: "stormtimestr"}}});
spoutStats.append(Mustache.render($(template).filter("#spout-stats-template").html(),response));
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0427e06e/storm-dist/binary/pom.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/pom.xml b/storm-dist/binary/pom.xml
index 4fa3a27..bb1b876 100644
--- a/storm-dist/binary/pom.xml
+++ b/storm-dist/binary/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.2-incubating-SNAPSHOT</version>
+ <version>0.9.2-incubating-mt0000</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.storm</groupId>
[04/24] git commit: add AssignableShellMetric.java to rpc
Posted by bo...@apache.org.
add AssignableShellMetric.java to rpc
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/98a27203
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/98a27203
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/98a27203
Branch: refs/heads/master
Commit: 98a27203169abf2d7a2bcad163448c488d8d0d40
Parents: 5448955
Author: JuDasheng <ju...@meituan.com>
Authored: Thu Mar 6 17:04:57 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Thu Mar 6 17:04:57 2014 +0800
----------------------------------------------------------------------
.../metric/api/rpc/AssignableShellMetric.java | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/98a27203/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
index e0722b8..20387ed 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package backtype.storm.metric.api.rpc;
import backtype.storm.metric.api.AssignableMetric;
[18/24] git commit: Merge branch 'master' of
github.com:dashengju/incubator-storm
Posted by bo...@apache.org.
Merge branch 'master' of github.com:dashengju/incubator-storm
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/e33023c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/e33023c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/e33023c7
Branch: refs/heads/master
Commit: e33023c7e7f5ec10cbc2d5eade7d68102bfbb317
Parents: 62f854a ecd0adb
Author: JuDasheng <ju...@meituan.com>
Authored: Wed Jun 18 14:29:31 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Wed Jun 18 14:29:31 2014 +0800
----------------------------------------------------------------------
.../src/dev/resources/tester_bolt_metrics.py | 35 +++++++++
.../src/dev/resources/tester_spout_metrics.py | 53 ++++++++++++++
.../storm/testing/PythonShellMetricsBolt.java | 32 +++++++++
.../storm/testing/PythonShellMetricsSpout.java | 35 +++++++++
.../test/clj/backtype/storm/metrics_test.clj | 74 ++++++++++++++++++--
5 files changed, 225 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
[10/24] git commit: support metrics and cglimit
Posted by bo...@apache.org.
support metrics and cglimit
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/fbaf7278
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/fbaf7278
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/fbaf7278
Branch: refs/heads/master
Commit: fbaf7278940925f27e24f436d1917aee28b23d3c
Parents: 0279d72
Author: JuDasheng <ju...@meituan.com>
Authored: Wed Jun 11 16:35:38 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Wed Jun 11 16:35:38 2014 +0800
----------------------------------------------------------------------
.../clj/backtype/storm/daemon/supervisor.clj | 4 ++-
.../storm/multilang/JsonSerializer.java | 10 +++++++
.../jvm/backtype/storm/multilang/ShellMsg.java | 20 ++++++++++++++
.../jvm/backtype/storm/spout/ShellSpout.java | 12 +++-----
.../src/jvm/backtype/storm/task/ShellBolt.java | 29 +++++---------------
storm-core/src/multilang/py/storm.py | 2 +-
6 files changed, 45 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fbaf7278/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 556653a..53b2802 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -473,8 +473,10 @@
topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)]
(substitute-worker-childopts s port))
logfilename (str "worker-" port ".log")
+ worker-childcgroup (when-let [s (conf WORKER-CHILDCGROUP)]
+ (.split s " "))
command (concat
- (conf WORKER-CHILDCGROUP)
+ worker-childcgroup
[(java-cmd) "-server"]
worker-childopts
topo-worker-childopts
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fbaf7278/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java b/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
index dd5773f..48405e0 100644
--- a/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
+++ b/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
@@ -118,6 +118,16 @@ public class JsonSerializer implements ISerializer {
shellMsg.addAnchor((String) o);
}
}
+
+ Object nameObj = msg.get("name");
+ String metricName = null;
+ if (nameObj != null && nameObj instanceof String) {
+ metricName = (String) nameObj;
+ }
+ shellMsg.setMetricName(metricName);
+
+ Object paramsObj = msg.get("params");
+ shellMsg.setMetricParams(paramsObj);
return shellMsg;
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fbaf7278/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java b/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
index 1747f5b..ef87536 100644
--- a/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
+++ b/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
@@ -25,6 +25,10 @@ public class ShellMsg {
private List<Object> tuple;
private boolean needTaskIds;
+ //metrics rpc
+ private String metricName;
+ private Object metricParams;
+
public String getCommand() {
return command;
}
@@ -102,4 +106,20 @@ public class ShellMsg {
public void setNeedTaskIds(boolean needTaskIds) {
this.needTaskIds = needTaskIds;
}
+
+ public void setMetricName(String metricName) {
+ this.metricName = metricName;
+ }
+
+ public String getMetricName() {
+ return this.metricName;
+ }
+
+ public void setMetricParams(Object metricParams) {
+ this.metricParams = metricParams;
+ }
+
+ public Object getMetricParams() {
+ return metricParams;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fbaf7278/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index 368f16a..9f4b38e 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -93,13 +93,9 @@ public class ShellSpout implements ISpout {
querySubprocess();
}
- private void handleMetrics(Map action) {
+ private void handleMetrics(ShellMsg shellMsg) {
//get metric name
- Object nameObj = action.get("name");
- if (nameObj == null || !(nameObj instanceof String) ) {
- throw new RuntimeException("Receive Metrics name is null or is not String");
- }
- String name = (String) nameObj;
+ String name = shellMsg.getMetricName();
if (name.isEmpty()) {
throw new RuntimeException("Receive Metrics name is empty");
}
@@ -115,7 +111,7 @@ public class ShellSpout implements ISpout {
IShellMetric iShellMetric = (IShellMetric)iMetric;
//call updateMetricFromRPC with params
- Object paramsObj = action.get("params");
+ Object paramsObj = shellMsg.getMetricParams();
try {
iShellMetric.updateMetricFromRPC(paramsObj);
} catch (RuntimeException re) {
@@ -151,7 +147,7 @@ public class ShellSpout implements ISpout {
_collector.emitDirect((int) task.longValue(), stream, tuple, messageId);
}
} else if (command.equals("metrics")) {
- handleMetrics(action);
+ handleMetrics(shellMsg);
} else {
throw new RuntimeException("Unknown command received: " + command);
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fbaf7278/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 50d6e20..b02a7fd 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -98,11 +98,10 @@ public class ShellBolt implements IBolt {
}
_rand = new Random();
_collector = collector;
-<<<<<<< HEAD
+
_context = context;
-=======
+
_process = new ShellProcess(_command);
->>>>>>> upstream/master
//subprocesses must send their pid first thing
Number subpid = _process.launch(stormConf, context);
@@ -126,13 +125,9 @@ public class ShellBolt implements IBolt {
String msg = shellMsg.getMsg();
LOG.info("Shell msg: " + msg);
} else if (command.equals("emit")) {
-<<<<<<< HEAD
- handleEmit(action);
- } else if (command.equals("metrics")) {
- handleMetrics(action);
-=======
handleEmit(shellMsg);
->>>>>>> upstream/master
+ } else if (command.equals("metrics")) {
+ handleMetrics(shellMsg);
}
} catch (InterruptedException e) {
} catch (Throwable t) {
@@ -194,14 +189,8 @@ public class ShellBolt implements IBolt {
_process.destroy();
_inputs.clear();
}
-<<<<<<< HEAD
-
- private void handleAck(Map action) {
- String id = (String) action.get("id");
-=======
private void handleAck(Object id) {
->>>>>>> upstream/master
Tuple acked = _inputs.remove(id);
if(acked==null) {
throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id);
@@ -245,13 +234,9 @@ public class ShellBolt implements IBolt {
}
}
- private void handleMetrics(Map action) {
+ private void handleMetrics(ShellMsg shellMsg) {
//get metric name
- Object nameObj = action.get("name");
- if (nameObj == null || !(nameObj instanceof String) ) {
- throw new RuntimeException("Receive Metrics name is null or is not String");
- }
- String name = (String) nameObj;
+ String name = shellMsg.getMetricName();
if (name.isEmpty()) {
throw new RuntimeException("Receive Metrics name is empty");
}
@@ -267,7 +252,7 @@ public class ShellBolt implements IBolt {
IShellMetric iShellMetric = (IShellMetric)iMetric;
//call updateMetricFromRPC with params
- Object paramsObj = action.get("params");
+ Object paramsObj = shellMsg.getMetricParams();
try {
iShellMetric.updateMetricFromRPC(paramsObj);
} catch (RuntimeException re) {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fbaf7278/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
index 5de3c0d..a4c8c2c 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -33,7 +33,7 @@ json_decode = lambda x: json.loads(x)
def readMsg():
msg = ""
while True:
- line = sys.stdin.readline()[0:-1]
+ line = sys.stdin.readline()
if not line:
raise Exception('Read EOF from stdin')
if line[0:-1] == "end":
[02/24] git commit: modify java refelction
Posted by bo...@apache.org.
modify java refelction
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ecb39a70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ecb39a70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ecb39a70
Branch: refs/heads/master
Commit: ecb39a70f157a5d08a0fb90d0d3319b1c9a71ceb
Parents: 7a21ae0
Author: JuDasheng <ju...@meituan.com>
Authored: Thu Mar 6 17:01:50 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Thu Mar 6 17:01:50 2014 +0800
----------------------------------------------------------------------
.../metric/api/rpc/CombinedShellMetric.java | 14 ++--
.../storm/metric/api/rpc/CountShellMetric.java | 18 ++---
.../storm/metric/api/rpc/IShellMetric.java | 8 +-
.../metric/api/rpc/ReducedShellMetric.java | 12 +--
.../jvm/backtype/storm/spout/ShellSpout.java | 84 ++++++++------------
.../src/jvm/backtype/storm/task/ShellBolt.java | 84 ++++++++------------
.../backtype/storm/task/TopologyContext.java | 38 +++++----
7 files changed, 113 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
index fd940a7..231c571 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
@@ -21,13 +21,11 @@ import backtype.storm.metric.api.CombinedMetric;
import backtype.storm.metric.api.ICombiner;
public class CombinedShellMetric extends CombinedMetric implements IShellMetric {
+ public CombinedShellMetric(ICombiner combiner) {
+ super(combiner);
+ }
- public CombinedShellMetric(ICombiner combiner) {
- super(combiner);
- }
-
- public void updateMetricFromRPC(Object value) {
- update(value);
- }
-
+ public void updateMetricFromRPC(Object value) {
+ update(value);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
index 1779223..def74c2 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
@@ -26,13 +26,13 @@ public class CountShellMetric extends CountMetric implements IShellMetric {
* if value is null, it will call incr()
* if value is long, it will call incrBy((long)params)
* */
- public void updateMetricFromRPC(Object value) {
- if (value == null) {
- incr();
- } else if (value instanceof Long) {
- incrBy((Long)value);
- } else {
- throw new RuntimeException("CountShellMetric updateMetricFromRPC params should be null or Long");
- }
- }
+ public void updateMetricFromRPC(Object value) {
+ if (value == null) {
+ incr();
+ } else if (value instanceof Long) {
+ incrBy((Long)value);
+ } else {
+ throw new RuntimeException("CountShellMetric updateMetricFromRPC params should be null or Long");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
index 9bec3a1..d53baea 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
@@ -20,14 +20,12 @@ package backtype.storm.metric.api.rpc;
import backtype.storm.metric.api.IMetric;
public interface IShellMetric extends IMetric {
- public static final String SHELL_METRICS_UPDATE_METHOD_NAME = "updateMetricFromRPC";
-
/***
* @function
- * This interface is used by ShellBolt and ShellSpout through RPC call to update Metric
+ * This interface is used by ShellBolt and ShellSpout through RPC call to update Metric
* @param
- * value used to update metric, its's meaning change according implementation
- * Object can be any json support types: String, Long, Double, Boolean, Null, List, Map
+ * value used to update metric, its's meaning change according implementation
+ * Object can be any json support types: String, Long, Double, Boolean, Null, List, Map
* */
public void updateMetricFromRPC(Object value);
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
index 727a709..097ed51 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
@@ -22,11 +22,11 @@ import backtype.storm.metric.api.ReducedMetric;
public class ReducedShellMetric extends ReducedMetric implements IShellMetric {
- public ReducedShellMetric(IReducer reducer) {
- super(reducer);
- }
+ public ReducedShellMetric(IReducer reducer) {
+ super(reducer);
+ }
- public void updateMetricFromRPC(Object value) {
- update(value);
- }
+ public void updateMetricFromRPC(Object value) {
+ update(value);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index e0abe8a..c6443e4 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -18,6 +18,7 @@
package backtype.storm.spout;
import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.IMetric;
import backtype.storm.metric.api.rpc.IShellMetric;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.ShellProcess;
@@ -41,8 +42,8 @@ public class ShellSpout implements ISpout {
private String[] _command;
private ShellProcess _process;
- private Map<String, IShellMetric> _registeredShellMetrics = new ConcurrentHashMap<String, IShellMetric>();
-
+ private TopologyContext _context;
+
public ShellSpout(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
}
@@ -55,6 +56,7 @@ public class ShellSpout implements ISpout {
SpoutOutputCollector collector) {
_process = new ShellProcess(_command);
_collector = collector;
+ _context = context;
try {
Number subpid = _process.launch(stormConf, context);
@@ -101,46 +103,35 @@ public class ShellSpout implements ISpout {
}
private void handleMetrics(Map action) {
- //get metrics
- Object nameObj = action.get("name");
- if ( !(nameObj instanceof String) ) {
- throw new RuntimeException("Receive Metrics name is not String");
- }
- String name = (String) nameObj;
- if (name == null || name.isEmpty()) {
- throw new RuntimeException("Receive Metrics name is NULL");
- }
- if ( !_registeredShellMetrics.containsKey(name)) {
- throw new RuntimeException("Receive Metrics name:" + name + " does not reigster.");
- }
- IShellMetric iMetric = _registeredShellMetrics.get(name);
-
- //get paramList
- Object paramsObj = action.get("params");
+ //get metric name
+ Object nameObj = action.get("name");
+ if (nameObj == null || !(nameObj instanceof String) ) {
+ throw new RuntimeException("Receive Metrics name is null or is not String");
+ }
+ String name = (String) nameObj;
+ if (name.isEmpty()) {
+ throw new RuntimeException("Receive Metrics name is empty");
+ }
- Class<? extends IShellMetric> oriClass = iMetric.getClass();
- Method method = null;
- try {
- method = oriClass.getMethod(IShellMetric.SHELL_METRICS_UPDATE_METHOD_NAME, new Class[]{Object.class});
- } catch (SecurityException e) {
- LOG.error("handleMetrics get method ["+name+"] SecurityException");
- throw new RuntimeException(e);
- } catch (NoSuchMethodException e) {
- LOG.error("handleMetrics get method ["+name+"] NoSuchMethodException");
- throw new RuntimeException(e);
- }
- try {
- method.invoke(iMetric, paramsObj);
- } catch (IllegalArgumentException e) {
- LOG.error("handleMetrics invoke["+name+"] IllegalArgumentException");
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- LOG.error("handleMetrics invoke["+name+"] IllegalAccessException");
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- LOG.error("handleMetrics invoke["+name+"] InvocationTargetException");
- throw new RuntimeException(e);
- }
+ //get metric by name
+ IMetric iMetric = _context.getRegisteredMetricByName(name);
+ if (iMetric == null) {
+ throw new RuntimeException("Not find metric by name["+name+"] ");
+ }
+ if ( !(iMetric instanceof IShellMetric)) {
+ throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+ }
+ IShellMetric iShellMetric = (IShellMetric)iMetric;
+
+ //call updateMetricFromRPC with params
+ Object paramsObj = action.get("params");
+ try {
+ iShellMetric.updateMetricFromRPC(paramsObj);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
private void querySubprocess(Object query) {
@@ -171,22 +162,13 @@ public class ShellSpout implements ISpout {
_collector.emitDirect((int)task.longValue(), stream, tuple, messageId);
}
} else if (command.equals("metrics")) {
- handleMetrics(action);
+ handleMetrics(action);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
-
- public <T extends IShellMetric> T registerMetric(String name, T metric) {
- if ( _registeredShellMetrics.containsKey(name) ) {
- throw new RuntimeException("The same metric name `" + name + "` was registered in ShellSpout twice." );
- } else {
- _registeredShellMetrics.put(name, metric);
- }
- return metric;
- }
@Override
public void activate() {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index e6cbe5e..b2bdc22 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -19,6 +19,7 @@ package backtype.storm.task;
import backtype.storm.Config;
import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.IMetric;
import backtype.storm.metric.api.rpc.IShellMetric;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
@@ -81,9 +82,9 @@ public class ShellBolt implements IBolt {
private Thread _readerThread;
private Thread _writerThread;
-
- private Map<String, IShellMetric> _registeredShellMetrics = new ConcurrentHashMap<String, IShellMetric>();
+ private TopologyContext _context;
+
public ShellBolt(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
}
@@ -101,6 +102,7 @@ public class ShellBolt implements IBolt {
_rand = new Random();
_process = new ShellProcess(_command);
_collector = collector;
+ _context = context;
try {
//subprocesses must send their pid first thing
@@ -133,7 +135,7 @@ public class ShellBolt implements IBolt {
} else if (command.equals("emit")) {
handleEmit(action);
} else if (command.equals("metrics")) {
- handleMetrics(action);
+ handleMetrics(action);
}
} catch (InterruptedException e) {
} catch (Throwable t) {
@@ -192,15 +194,6 @@ public class ShellBolt implements IBolt {
_process.destroy();
_inputs.clear();
}
-
- public <T extends IShellMetric> T registerMetric(String name, T metric) {
- if ( _registeredShellMetrics.containsKey(name) ) {
- throw new RuntimeException("The same metric name `" + name + "` was registered in ShellBolt twice." );
- } else {
- _registeredShellMetrics.put(name, metric);
- }
- return metric;
- }
private void handleAck(Map action) {
String id = (String) action.get("id");
@@ -256,46 +249,35 @@ public class ShellBolt implements IBolt {
}
private void handleMetrics(Map action) {
- //get metrics
- Object nameObj = action.get("name");
- if ( !(nameObj instanceof String) ) {
- throw new RuntimeException("Receive Metrics name is not String");
- }
- String name = (String) nameObj;
- if (name == null || name.isEmpty()) {
- throw new RuntimeException("Receive Metrics name is NULL");
- }
- if ( !_registeredShellMetrics.containsKey(name)) {
- throw new RuntimeException("Receive Metrics name:" + name + " does not reigster.");
- }
- IShellMetric iMetric = _registeredShellMetrics.get(name);
-
- //get paramList
- Object paramsObj = action.get("params");
+ //get metric name
+ Object nameObj = action.get("name");
+ if (nameObj == null || !(nameObj instanceof String) ) {
+ throw new RuntimeException("Receive Metrics name is null or is not String");
+ }
+ String name = (String) nameObj;
+ if (name.isEmpty()) {
+ throw new RuntimeException("Receive Metrics name is empty");
+ }
- Class<? extends IShellMetric> oriClass = iMetric.getClass();
- Method method = null;
- try {
- method = oriClass.getMethod(IShellMetric.SHELL_METRICS_UPDATE_METHOD_NAME, new Class[]{Object.class});
- } catch (SecurityException e) {
- LOG.error("handleMetrics get method ["+name+"] SecurityException");
- throw new RuntimeException(e);
- } catch (NoSuchMethodException e) {
- LOG.error("handleMetrics get method ["+name+"] NoSuchMethodException");
- throw new RuntimeException(e);
- }
- try {
- method.invoke(iMetric, paramsObj);
- } catch (IllegalArgumentException e) {
- LOG.error("handleMetrics invoke["+name+"] IllegalArgumentException");
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- LOG.error("handleMetrics invoke["+name+"] IllegalAccessException");
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- LOG.error("handleMetrics invoke["+name+"] InvocationTargetException");
- throw new RuntimeException(e);
- }
+ //get metric by name
+ IMetric iMetric = _context.getRegisteredMetricByName(name);
+ if (iMetric == null) {
+ throw new RuntimeException("Not find metric by name["+name+"] ");
+ }
+ if ( !(iMetric instanceof IShellMetric)) {
+ throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+ }
+ IShellMetric iShellMetric = (IShellMetric)iMetric;
+
+ //call updateMetricFromRPC with params
+ Object paramsObj = action.get("params");
+ try {
+ iShellMetric.updateMetricFromRPC(paramsObj);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
private void die(Throwable exception) {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecb39a70/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index 1285739..c0fcf20 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -252,23 +252,31 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
return metric;
}
-
- /*
- * Convinience method for ShellBolt to registering ShellMetric.
- */
- public <T extends IShellMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs, ShellBolt bolt) {
- bolt.registerMetric(name, metric);
- return registerMetric(name, metric, timeBucketSizeInSecs);
- }
-
- /*
- * Convinience method for ShellSpout to registering ShellMetric.
+
+ /**
+ * Get component's metric from registered metrics by name.
+ * Notice: Normally, one component can only register one metric name once.
+ * But now registerMetric has a bug(https://issues.apache.org/jira/browse/STORM-254)
+ * cause the same metric name can register twice.
+ * So we just return the first metric we meet.
*/
- public <T extends IShellMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs, ShellSpout spout) {
- spout.registerMetric(name, metric);
- return registerMetric(name, metric, timeBucketSizeInSecs);
- }
+ public IMetric getRegisteredMetricByName(String name) {
+ IMetric metric = null;
+ for (Map<Integer, Map<String, IMetric>> taskIdToNameToMetric: _registeredMetrics.values()) {
+ Map<String, IMetric> nameToMetric = taskIdToNameToMetric.get(_taskId);
+ if (nameToMetric != null) {
+ metric = nameToMetric.get(name);
+ if (metric != null) {
+ //we just return the first metric we meet
+ break;
+ }
+ }
+ }
+
+ return metric;
+ }
+
/*
* Convinience method for registering ReducedMetric.
*/
[08/24] git commit: add cgroup support & storm.py bug fix
Posted by bo...@apache.org.
add cgroup support & storm.py bug fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ead42379
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ead42379
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ead42379
Branch: refs/heads/master
Commit: ead423793d392959f41c2782a05ed23413bc84a3
Parents: bc39172
Author: JuDasheng <ju...@meituan.com>
Authored: Tue Jun 10 18:48:06 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Tue Jun 10 18:48:06 2014 +0800
----------------------------------------------------------------------
patch/STORM-132_PULL-36.patch | 31 +++++++++++++++++++++++++++++++
1 file changed, 31 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ead42379/patch/STORM-132_PULL-36.patch
----------------------------------------------------------------------
diff --git a/patch/STORM-132_PULL-36.patch b/patch/STORM-132_PULL-36.patch
new file mode 100644
index 0000000..005ac26
--- /dev/null
+++ b/patch/STORM-132_PULL-36.patch
@@ -0,0 +1,31 @@
+From 6b275d95fbdbc8374a215ecb2551f0fca3438d81 Mon Sep 17 00:00:00 2001
+From: Kang Xiao <kx...@gmail.com>
+Date: Tue, 18 Feb 2014 23:23:50 +0800
+Subject: [PATCH] STORM-132 sort supervisor by free slot in desending order in
+ EvenScheduler to schedule more evenly between supervisor
+
+---
+ storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj | 7 ++++++-
+ 1 file changed, 6 insertions(+), 1 deletion(-)
+
+diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
+index 28b9202..828606d 100644
+--- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
++++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
+@@ -22,7 +22,12 @@
+ :implements [backtype.storm.scheduler.IScheduler]))
+
+ (defn sort-slots [all-slots]
+- (let [split-up (vals (group-by first all-slots))]
++ (let [split-up
++ (map second
++ (reverse
++ (sort
++ (for [[host ports] (group-by first all-slots)]
++ [(count ports) ports]))))]
+ (apply interleave-all split-up)
+ ))
+
+--
+1.8.5.1
+
[17/24] git commit: change Multilang compitable with
storm-incubating-v0.9.3 and add unit tests
Posted by bo...@apache.org.
change Multilang compitable with storm-incubating-v0.9.3 and add unit tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ecd0adb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ecd0adb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ecd0adb8
Branch: refs/heads/master
Commit: ecd0adb812defd0f11f77b3d2e046a0adf787079
Parents: d448e34
Author: dashengju <da...@qq.com>
Authored: Wed Jun 18 13:52:27 2014 +0800
Committer: dashengju <da...@qq.com>
Committed: Wed Jun 18 13:52:27 2014 +0800
----------------------------------------------------------------------
.../src/dev/resources/tester_bolt_metrics.py | 35 +++++++++
.../src/dev/resources/tester_spout_metrics.py | 53 ++++++++++++++
.../metric/api/rpc/AssignableShellMetric.java | 30 ++++++++
.../metric/api/rpc/CombinedShellMetric.java | 31 ++++++++
.../storm/metric/api/rpc/CountShellMetric.java | 38 ++++++++++
.../storm/metric/api/rpc/IShellMetric.java | 31 ++++++++
.../metric/api/rpc/ReducedShellMetric.java | 32 +++++++++
.../storm/multilang/JsonSerializer.java | 10 +++
.../jvm/backtype/storm/multilang/ShellMsg.java | 20 ++++++
.../jvm/backtype/storm/spout/ShellSpout.java | 37 ++++++++++
.../src/jvm/backtype/storm/task/ShellBolt.java | 38 ++++++++++
.../backtype/storm/task/TopologyContext.java | 24 +++++++
.../storm/testing/PythonShellMetricsBolt.java | 32 +++++++++
.../storm/testing/PythonShellMetricsSpout.java | 35 +++++++++
storm-core/src/multilang/py/storm.py | 3 +
.../test/clj/backtype/storm/metrics_test.clj | 74 ++++++++++++++++++--
16 files changed, 519 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/dev/resources/tester_bolt_metrics.py
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/tester_bolt_metrics.py b/storm-core/src/dev/resources/tester_bolt_metrics.py
new file mode 100644
index 0000000..13a2d18
--- /dev/null
+++ b/storm-core/src/dev/resources/tester_bolt_metrics.py
@@ -0,0 +1,35 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This Python file uses the following encoding: utf-8
+
+import storm
+from random import random
+
+class TesterMetricsBolt(storm.Bolt):
+ def initialize(self, conf, context):
+ storm.log('bolt initializing')
+
+ def process(self, tup):
+ word = tup.values[0];
+
+ storm.rpcMetrics("my-custom-shell-metric", 1);
+
+ storm.ack(tup)
+
+TesterMetricsBolt().run()
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/dev/resources/tester_spout_metrics.py
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/tester_spout_metrics.py b/storm-core/src/dev/resources/tester_spout_metrics.py
new file mode 100644
index 0000000..a91128a
--- /dev/null
+++ b/storm-core/src/dev/resources/tester_spout_metrics.py
@@ -0,0 +1,53 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This Python file uses the following encoding: utf-8
+
+import storm
+from random import choice
+from time import sleep
+from uuid import uuid4
+
+words = [u"nathan", u"mike", u"jackson", u"golda", u"bertels人"]
+
+class TesterSpout(storm.Spout):
+ def initialize(self, conf, context):
+ storm.log('spout initializing')
+ self.pending = {}
+ self.count = 0
+
+ def nextTuple(self):
+ sleep(1)
+ storm.log("TesterSpout emit a tuple")
+ word = choice(words)
+ id = str(uuid4())
+ self.pending[id] = word
+ if self.count < 2:
+ storm.rpcMetrics("my-custom-shellspout-metric", 1)
+ storm.log("TesterSpout update my-custom-shellspout-metric")
+ self.count = self.count + 1
+ storm.emit([word], id=id)
+
+ def ack(self, id):
+ del self.pending[id]
+
+ def fail(self, id):
+ storm.log("emitting " + self.pending[id] + " on fail")
+ storm.emit([self.pending[id]], id=id)
+
+TesterSpout().run()
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
new file mode 100644
index 0000000..20387ed
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/AssignableShellMetric.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.AssignableMetric;
+
+public class AssignableShellMetric extends AssignableMetric implements IShellMetric {
+ public AssignableShellMetric(Object value) {
+ super(value);
+ }
+
+ public void updateMetricFromRPC(Object value) {
+ setValue(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
new file mode 100644
index 0000000..231c571
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CombinedShellMetric.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.api.ICombiner;
+
+public class CombinedShellMetric extends CombinedMetric implements IShellMetric {
+ public CombinedShellMetric(ICombiner combiner) {
+ super(combiner);
+ }
+
+ public void updateMetricFromRPC(Object value) {
+ update(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
new file mode 100644
index 0000000..def74c2
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.CountMetric;
+
+public class CountShellMetric extends CountMetric implements IShellMetric {
+ /***
+ * @param
+ * params should be null or long
+ * if value is null, it will call incr()
+ * if value is long, it will call incrBy((long)params)
+ * */
+ public void updateMetricFromRPC(Object value) {
+ if (value == null) {
+ incr();
+ } else if (value instanceof Long) {
+ incrBy((Long)value);
+ } else {
+ throw new RuntimeException("CountShellMetric updateMetricFromRPC params should be null or Long");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
new file mode 100644
index 0000000..d53baea
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/IShellMetric.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.IMetric;
+
+public interface IShellMetric extends IMetric {
+ /***
+ * @function
+ * This interface is used by ShellBolt and ShellSpout through RPC call to update Metric
+ * @param
+ * value used to update metric, its's meaning change according implementation
+ * Object can be any json support types: String, Long, Double, Boolean, Null, List, Map
+ * */
+ public void updateMetricFromRPC(Object value);
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
new file mode 100644
index 0000000..097ed51
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/ReducedShellMetric.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.api.rpc;
+
+import backtype.storm.metric.api.IReducer;
+import backtype.storm.metric.api.ReducedMetric;
+
+public class ReducedShellMetric extends ReducedMetric implements IShellMetric {
+
+ public ReducedShellMetric(IReducer reducer) {
+ super(reducer);
+ }
+
+ public void updateMetricFromRPC(Object value) {
+ update(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java b/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
index d94464e..4d3c3f8 100644
--- a/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
+++ b/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
@@ -135,6 +135,16 @@ public class JsonSerializer implements ISerializer {
shellMsg.addAnchor((String) o);
}
}
+
+ Object nameObj = msg.get("name");
+ String metricName = null;
+ if (nameObj != null && nameObj instanceof String) {
+ metricName = (String) nameObj;
+ }
+ shellMsg.setMetricName(metricName);
+
+ Object paramsObj = msg.get("params");
+ shellMsg.setMetricParams(paramsObj);
return shellMsg;
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java b/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
index b78a960..ed803c4 100644
--- a/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
+++ b/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
@@ -42,6 +42,10 @@ public class ShellMsg {
private List<Object> tuple;
private boolean needTaskIds;
+ //metrics rpc
+ private String metricName;
+ private Object metricParams;
+
public String getCommand() {
return command;
}
@@ -119,4 +123,20 @@ public class ShellMsg {
public void setNeedTaskIds(boolean needTaskIds) {
this.needTaskIds = needTaskIds;
}
+
+ public void setMetricName(String metricName) {
+ this.metricName = metricName;
+ }
+
+ public String getMetricName() {
+ return this.metricName;
+ }
+
+ public void setMetricParams(Object metricParams) {
+ this.metricParams = metricParams;
+ }
+
+ public Object getMetricParams() {
+ return metricParams;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index d6a18e7..9f4b38e 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -18,6 +18,8 @@
package backtype.storm.spout;
import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.rpc.IShellMetric;
import backtype.storm.multilang.ShellMsg;
import backtype.storm.multilang.SpoutMsg;
import backtype.storm.task.TopologyContext;
@@ -25,6 +27,7 @@ import backtype.storm.utils.ShellProcess;
import java.util.Map;
import java.util.List;
import java.io.IOException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +38,9 @@ public class ShellSpout implements ISpout {
private SpoutOutputCollector _collector;
private String[] _command;
private ShellProcess _process;
+
+ private TopologyContext _context;
+
private SpoutMsg _spoutMsg;
public ShellSpout(ShellComponent component) {
@@ -48,6 +54,7 @@ public class ShellSpout implements ISpout {
public void open(Map stormConf, TopologyContext context,
SpoutOutputCollector collector) {
_collector = collector;
+ _context = context;
_process = new ShellProcess(_command);
@@ -85,6 +92,34 @@ public class ShellSpout implements ISpout {
_spoutMsg.setId(msgId);
querySubprocess();
}
+
+ private void handleMetrics(ShellMsg shellMsg) {
+ //get metric name
+ String name = shellMsg.getMetricName();
+ if (name.isEmpty()) {
+ throw new RuntimeException("Receive Metrics name is empty");
+ }
+
+ //get metric by name
+ IMetric iMetric = _context.getRegisteredMetricByName(name);
+ if (iMetric == null) {
+ throw new RuntimeException("Not find metric by name["+name+"] ");
+ }
+ if ( !(iMetric instanceof IShellMetric)) {
+ throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+ }
+ IShellMetric iShellMetric = (IShellMetric)iMetric;
+
+ //call updateMetricFromRPC with params
+ Object paramsObj = shellMsg.getMetricParams();
+ try {
+ iShellMetric.updateMetricFromRPC(paramsObj);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
private void querySubprocess() {
try {
@@ -111,6 +146,8 @@ public class ShellSpout implements ISpout {
} else {
_collector.emitDirect((int) task.longValue(), stream, tuple, messageId);
}
+ } else if (command.equals("metrics")) {
+ handleMetrics(shellMsg);
} else {
throw new RuntimeException("Unknown command received: " + command);
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 81aca02..b02a7fd 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -19,6 +19,9 @@ package backtype.storm.task;
import backtype.storm.Config;
import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.rpc.IShellMetric;
+import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.ShellProcess;
import backtype.storm.multilang.BoltMsg;
@@ -76,6 +79,8 @@ public class ShellBolt implements IBolt {
private Thread _readerThread;
private Thread _writerThread;
+
+ private TopologyContext _context;
public ShellBolt(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
@@ -93,6 +98,9 @@ public class ShellBolt implements IBolt {
}
_rand = new Random();
_collector = collector;
+
+ _context = context;
+
_process = new ShellProcess(_command);
//subprocesses must send their pid first thing
@@ -118,6 +126,8 @@ public class ShellBolt implements IBolt {
LOG.info("Shell msg: " + msg);
} else if (command.equals("emit")) {
handleEmit(shellMsg);
+ } else if (command.equals("metrics")) {
+ handleMetrics(shellMsg);
}
} catch (InterruptedException e) {
} catch (Throwable t) {
@@ -223,6 +233,34 @@ public class ShellBolt implements IBolt {
shellMsg.getStream(), anchors, shellMsg.getTuple());
}
}
+
+ private void handleMetrics(ShellMsg shellMsg) {
+ //get metric name
+ String name = shellMsg.getMetricName();
+ if (name.isEmpty()) {
+ throw new RuntimeException("Receive Metrics name is empty");
+ }
+
+ //get metric by name
+ IMetric iMetric = _context.getRegisteredMetricByName(name);
+ if (iMetric == null) {
+ throw new RuntimeException("Not find metric by name["+name+"] ");
+ }
+ if ( !(iMetric instanceof IShellMetric)) {
+ throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+ }
+ IShellMetric iShellMetric = (IShellMetric)iMetric;
+
+ //call updateMetricFromRPC with params
+ Object paramsObj = shellMsg.getMetricParams();
+ try {
+ iShellMetric.updateMetricFromRPC(paramsObj);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
private void die(Throwable exception) {
_exception = exception;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index 10e630c..356edb3 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -271,6 +271,30 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
return metric;
}
+ /**
+ * Get component's metric from registered metrics by name.
+ * Notice: Normally, one component can only register one metric name once.
+ * But now registerMetric has a bug(https://issues.apache.org/jira/browse/STORM-254)
+ * cause the same metric name can register twice.
+ * So we just return the first metric we meet.
+ */
+ public IMetric getRegisteredMetricByName(String name) {
+ IMetric metric = null;
+
+ for (Map<Integer, Map<String, IMetric>> taskIdToNameToMetric: _registeredMetrics.values()) {
+ Map<String, IMetric> nameToMetric = taskIdToNameToMetric.get(_taskId);
+ if (nameToMetric != null) {
+ metric = nameToMetric.get(name);
+ if (metric != null) {
+ //we just return the first metric we meet
+ break;
+ }
+ }
+ }
+
+ return metric;
+ }
+
/*
* Convinience method for registering ReducedMetric.
*/
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsBolt.java b/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsBolt.java
new file mode 100644
index 0000000..646d621
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsBolt.java
@@ -0,0 +1,32 @@
+package backtype.storm.testing;
+
+import java.util.Map;
+
+import backtype.storm.metric.api.rpc.CountShellMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.ShellBolt;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+
+public class PythonShellMetricsBolt extends ShellBolt implements IRichBolt {
+ private static final long serialVersionUID = 1999209252187463355L;
+
+ public PythonShellMetricsBolt(String[] command) {
+ super(command);
+ }
+
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ super.prepare(stormConf, context, collector);
+
+ CountShellMetric cMetric = new CountShellMetric();
+ context.registerMetric("my-custom-shell-metric", cMetric, 5);
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsSpout.java b/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsSpout.java
new file mode 100644
index 0000000..04bd4ae
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsSpout.java
@@ -0,0 +1,35 @@
+package backtype.storm.testing;
+
+import java.util.Map;
+
+import backtype.storm.metric.api.rpc.CountShellMetric;
+import backtype.storm.spout.ShellSpout;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+public class PythonShellMetricsSpout extends ShellSpout implements IRichSpout {
+ private static final long serialVersionUID = 1999209252187463355L;
+
+ public PythonShellMetricsSpout(String[] command) {
+ super(command);
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ super.open(conf, context, collector);
+
+ CountShellMetric cMetric = new CountShellMetric();
+ context.registerMetric("my-custom-shellspout-metric", cMetric, 5);
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("field1"));
+ }
+
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
index bec3f0c..9965c81 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -136,6 +136,9 @@ def reportError(msg):
def log(msg):
sendMsgToParent({"command": "log", "msg": msg})
+def rpcMetrics(name, params):
+ sendMsgToParent({"command": "metrics", "name": name, "params": params})
+
def initComponent():
setupInfo = readMsg()
sendpid(setupInfo['pidDir'])
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ecd0adb8/storm-core/test/clj/backtype/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/metrics_test.clj b/storm-core/test/clj/backtype/storm/metrics_test.clj
index 2f5b0e1..4aaa3dc 100644
--- a/storm-core/test/clj/backtype/storm/metrics_test.clj
+++ b/storm-core/test/clj/backtype/storm/metrics_test.clj
@@ -18,8 +18,12 @@
(:import [backtype.storm.topology TopologyBuilder])
(:import [backtype.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus])
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount
- TestAggregatesCounter TestConfBolt AckFailMapTracker])
+ TestAggregatesCounter TestConfBolt AckFailMapTracker PythonShellMetricsBolt PythonShellMetricsSpout])
+ (:import [backtype.storm.task ShellBolt])
+ (:import [backtype.storm.spout ShellSpout])
(:import [backtype.storm.metric.api CountMetric IMetricsConsumer$DataPoint IMetricsConsumer$TaskInfo])
+ (:import [backtype.storm.metric.api.rpc CountShellMetric])
+ (:import [backtype.storm.utils Utils])
(:use [backtype.storm bootstrap testing])
(:use [backtype.storm.daemon common])
@@ -96,7 +100,10 @@
(deftest test-custom-metric
(with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
- [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]}]
+ [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+ "storm.zookeeper.connection.timeout" 30000
+ "storm.zookeeper.session.timeout" 60000
+ }]
(let [feeder (feeder-spout ["field1"])
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec feeder)}
@@ -121,12 +128,15 @@
(deftest test-custom-metric-with-multi-tasks
(with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
- [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]}]
+ [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+ "storm.zookeeper.connection.timeout" 30000
+ "storm.zookeeper.session.timeout" 60000
+ }]
(let [feeder (feeder-spout ["field1"])
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec feeder)}
{"2" (thrift/mk-bolt-spec {"1" :all} count-acks :p 1 :conf {TOPOLOGY-TASKS 2})})]
- (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
+ (submit-local-topology (:nimbus cluster) "metrics-tester-with-multitasks" {} topology)
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 6)
@@ -143,6 +153,62 @@
(advance-cluster-time cluster 5)
(assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
+(defn mk-shell-bolt-with-metrics-spec
+ [inputs command & kwargs]
+ (let [command (into-array String command)]
+ (apply thrift/mk-bolt-spec inputs
+ (PythonShellMetricsBolt. command) kwargs)))
+
+(deftest test-custom-metric-with-multilang-py
+ (with-local-cluster
+ [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+ "storm.zookeeper.connection.timeout" 30000
+ "storm.zookeeper.session.timeout" 60000
+ }]
+ (let [feeder (feeder-spout ["field1"])
+ topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec feeder)}
+ {"2" (mk-shell-bolt-with-metrics-spec {"1" :global} ["python" "tester_bolt_metrics.py"])})]
+ (submit-local-topology (:nimbus cluster) "shell-metrics-tester" {} topology)
+
+ (.feed feeder ["a"] 1)
+ (Thread/sleep 6000)
+ (assert-buckets! "2" "my-custom-shell-metric" [1])
+
+ (Thread/sleep 5000)
+ (assert-buckets! "2" "my-custom-shell-metric" [1 0])
+
+ (Thread/sleep 20000)
+ (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0])
+
+ (.feed feeder ["b"] 2)
+ (.feed feeder ["c"] 3)
+ (Thread/sleep 5000)
+ (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0 2])
+ )))
+
+(defn mk-shell-spout-with-metrics-spec
+ [command & kwargs]
+ (let [command (into-array String command)]
+ (apply thrift/mk-spout-spec (PythonShellMetricsSpout. command) kwargs)))
+
+(deftest test-custom-metric-with-spout-multilang-py
+ (with-local-cluster
+ [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+ "storm.zookeeper.connection.timeout" 30000
+ "storm.zookeeper.session.timeout" 60000}]
+ (let [topology (thrift/mk-topology
+ {"1" (mk-shell-spout-with-metrics-spec ["python" "tester_spout_metrics.py"])}
+ {"2" (thrift/mk-bolt-spec {"1" :all} count-acks)})]
+ (submit-local-topology (:nimbus cluster) "shell-spout-metrics-tester" {} topology)
+
+ (Thread/sleep 7000)
+ (assert-buckets! "1" "my-custom-shellspout-metric" [2])
+ )))
+
+
(deftest test-builtin-metrics-1
(with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
[09/24] git commit: add cgroup support & storm.py bug fix
Posted by bo...@apache.org.
add cgroup support & storm.py bug fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/0279d725
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/0279d725
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/0279d725
Branch: refs/heads/master
Commit: 0279d725cb6ace2e4f4cb64d48ef236a97f0d785
Parents: ead4237
Author: JuDasheng <ju...@meituan.com>
Authored: Tue Jun 10 18:48:47 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Tue Jun 10 18:48:47 2014 +0800
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/supervisor.clj | 3 ++-
.../src/clj/backtype/storm/scheduler/EvenScheduler.clj | 7 ++++++-
storm-core/src/jvm/backtype/storm/Config.java | 6 ++++++
storm-core/src/multilang/py/storm.py | 13 +++++++++++--
4 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0279d725/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 7566a79..556653a 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -474,7 +474,8 @@
(substitute-worker-childopts s port))
logfilename (str "worker-" port ".log")
command (concat
- [(java-cmd) "-server"]
+ (conf WORKER-CHILDCGROUP)
+ [(java-cmd) "-server"]
worker-childopts
topo-worker-childopts
[(str "-Djava.library.path=" jlp)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0279d725/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
index 28b9202..828606d 100644
--- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
+++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
@@ -22,7 +22,12 @@
:implements [backtype.storm.scheduler.IScheduler]))
(defn sort-slots [all-slots]
- (let [split-up (vals (group-by first all-slots))]
+ (let [split-up
+ (map second
+ (reverse
+ (sort
+ (for [[host ports] (group-by first all-slots)]
+ [(count ports) ports]))))]
(apply interleave-all split-up)
))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0279d725/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index ff309a5..531fa14 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -475,6 +475,12 @@ public class Config extends HashMap<String, Object> {
public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
/**
+ * The cgroup opts provided to workers launched by this supervisor.
+ */
+ public static final String WORKER_CHILDCGROUP = "worker.childcgroup";
+ public static final Object WORKER_CHILDCGROUP_SCHEMA = String.class;
+
+ /**
* control how many worker receiver threads we need per worker
*/
public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0279d725/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
index c0387b4..5de3c0d 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -34,9 +34,11 @@ def readMsg():
msg = ""
while True:
line = sys.stdin.readline()[0:-1]
- if line == "end":
+ if not line:
+ raise Exception('Read EOF from stdin')
+ if line[0:-1] == "end":
break
- msg = msg + line + "\n"
+ msg = msg + line
return json_decode(msg[0:-1])
MODE = None
@@ -180,6 +182,13 @@ class BasicBolt(object):
def initialize(self, stormconf, context):
pass
+ def redirect_stdout_to_stderr(self):
+ self.bakup_stdout = sys.stdout
+ sys.stdout = sys.stderr
+
+ def recover_stdout(self):
+ sys.stdout = self.bakup_stdout
+
def process(self, tuple):
pass
[24/24] git commit: Added STORM-200 to Changelog and Readme
Posted by bo...@apache.org.
Added STORM-200 to Changelog and Readme
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/8da9572a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/8da9572a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/8da9572a
Branch: refs/heads/master
Commit: 8da9572ac7d12e843ce8579c9e5629a528a3d0ea
Parents: ff345c1
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Tue Jul 1 15:38:41 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Tue Jul 1 15:38:41 2014 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
README.markdown | 1 +
2 files changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/8da9572a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 79fff25..7e37996 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -13,6 +13,7 @@
* STORM-372: Typo in storm_env.ini
* STORM-266: Adding shell process pid and name in the log message
* STORM-367: Storm UI REST api documentation.
+ * STORM-200: Proposal for Multilang's Metrics feature
## 0.9.2-incubating
* STORM-66: send taskid on initial handshake
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/8da9572a/README.markdown
----------------------------------------------------------------------
diff --git a/README.markdown b/README.markdown
index de16a4c..f026e6d 100644
--- a/README.markdown
+++ b/README.markdown
@@ -148,6 +148,7 @@ under the License.
* Krystian Nowak ([@krystiannowak](https://github.com/krystiannowak))
* Parth-Brahmbhatt ([@Parth-Brahmbhatt](https://github.com/Parth-Brahmbhatt))
* Adrian Petrescu ([@apetresc](https://github.com/apetresc))
+* DashengJu ([@dashengju](https://github.com/dashengju))
## Acknowledgements
[15/24] git commit: remove other code and only multilang metircs
Posted by bo...@apache.org.
remove other code and only multilang metircs
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/a4b26af6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/a4b26af6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/a4b26af6
Branch: refs/heads/master
Commit: a4b26af6044b3774fec6224a74b8b3f2b994d535
Parents: 22a6ca9
Author: JuDasheng <ju...@meituan.com>
Authored: Fri Jun 13 11:49:42 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Fri Jun 13 11:49:42 2014 +0800
----------------------------------------------------------------------
build.sh | 9 -
examples/storm-starter/pom.xml | 2 +-
external/storm-kafka/pom.xml | 2 +-
patch/STORM-132_PULL-36.patch | 31 --
pom.xml | 20 +-
.../maven-shade-clojure-transformer/pom.xml | 4 +-
storm-core/dependency-reduced-pom.xml | 359 -------------------
storm-core/pom.xml | 4 -
.../src/clj/backtype/storm/daemon/executor.clj | 67 +---
.../clj/backtype/storm/daemon/supervisor.clj | 5 +-
.../src/clj/backtype/storm/daemon/task.clj | 4 +-
.../backtype/storm/scheduler/EvenScheduler.clj | 7 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 74 ++--
storm-core/src/jvm/backtype/storm/Config.java | 6 -
storm-core/src/multilang/py/storm.py | 17 +-
storm-core/src/ui/public/topology.html | 6 +-
storm-dist/binary/pom.xml | 4 -
17 files changed, 72 insertions(+), 549 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/build.sh
----------------------------------------------------------------------
diff --git a/build.sh b/build.sh
deleted file mode 100755
index 2563c33..0000000
--- a/build.sh
+++ /dev/null
@@ -1,9 +0,0 @@
-#!/bin/bash
-
-mvn clean install -DskipTests=true
-
-cd storm-dist/binary
-
-mvn package
-
-cd -
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index b56aa92..903c6e7 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.2-incubating-mt0000</version>
+ <version>0.9.3-incubating-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/external/storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index b7aaccc..4972619 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.2-incubating-mt0000</version>
+ <version>0.9.3-incubating-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/patch/STORM-132_PULL-36.patch
----------------------------------------------------------------------
diff --git a/patch/STORM-132_PULL-36.patch b/patch/STORM-132_PULL-36.patch
deleted file mode 100644
index 005ac26..0000000
--- a/patch/STORM-132_PULL-36.patch
+++ /dev/null
@@ -1,31 +0,0 @@
-From 6b275d95fbdbc8374a215ecb2551f0fca3438d81 Mon Sep 17 00:00:00 2001
-From: Kang Xiao <kx...@gmail.com>
-Date: Tue, 18 Feb 2014 23:23:50 +0800
-Subject: [PATCH] STORM-132 sort supervisor by free slot in desending order in
- EvenScheduler to schedule more evenly between supervisor
-
----
- storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj | 7 ++++++-
- 1 file changed, 6 insertions(+), 1 deletion(-)
-
-diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
-index 28b9202..828606d 100644
---- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
-+++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
-@@ -22,7 +22,12 @@
- :implements [backtype.storm.scheduler.IScheduler]))
-
- (defn sort-slots [all-slots]
-- (let [split-up (vals (group-by first all-slots))]
-+ (let [split-up
-+ (map second
-+ (reverse
-+ (sort
-+ (for [[host ports] (group-by first all-slots)]
-+ [(count ports) ports]))))]
- (apply interleave-all split-up)
- ))
-
---
-1.8.5.1
-
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c07a471..b7286dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,17 +18,16 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <!--
<parent>
<groupId>org.apache</groupId>
<artifactId>apache</artifactId>
<version>10</version>
</parent>
- -->
+
<groupId>org.apache.storm</groupId>
<artifactId>storm</artifactId>
- <version>0.9.2-incubating-mt0000</version>
+ <version>0.9.3-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Storm</name>
<description>Distributed and fault-tolerant realtime computation</description>
@@ -282,16 +281,11 @@
</profiles>
<distributionManagement>
- <repository>
- <id>nexus-releases</id>
- <name>nexus-releases</name>
- <url>http://nexus.sankuai.com:8081/nexus/content/repositories/releases</url>
- </repository>
- <snapshotRepository>
- <id>nexus-snapshots</id>
- <name>nexus-snapshots</name>
- <url>http://nexus.sankuai.com:8081/nexus/content/repositories/snapshots</url>
- </snapshotRepository>
+ <site>
+ <id>storm.maven.website</id>
+ <name>Storm Website</name>
+ <url>file:///tmp/site</url>
+ </site>
</distributionManagement>
<dependencyManagement>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-buildtools/maven-shade-clojure-transformer/pom.xml
----------------------------------------------------------------------
diff --git a/storm-buildtools/maven-shade-clojure-transformer/pom.xml b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
index 1c944d5..a6fbad1 100644
--- a/storm-buildtools/maven-shade-clojure-transformer/pom.xml
+++ b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.2-incubating-mt0000</version>
+ <version>0.9.3-incubating-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
@@ -36,4 +36,4 @@
<scope>provided</scope>
</dependency>
</dependencies>
-</project>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/dependency-reduced-pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/dependency-reduced-pom.xml b/storm-core/dependency-reduced-pom.xml
deleted file mode 100644
index 9dacd73..0000000
--- a/storm-core/dependency-reduced-pom.xml
+++ /dev/null
@@ -1,359 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>0.9.2-incubating-mt0000</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <name>Storm Core</name>
- <description>Storm Core Java API and Clojure implementation.</description>
- <build>
- <sourceDirectory>src/jvm</sourceDirectory>
- <resources>
- <resource>
- <directory>../conf</directory>
- </resource>
- <resource>
- <targetPath>META-INF</targetPath>
- <directory>../</directory>
- <includes>
- <include>NOTICE</include>
- </includes>
- </resource>
- </resources>
- <testResources>
- <testResource>
- <directory>src/dev</directory>
- </testResource>
- <testResource>
- <directory>test/resources</directory>
- </testResource>
- </testResources>
- <plugins>
- <plugin>
- <groupId>com.theoryinpractise</groupId>
- <artifactId>clojure-maven-plugin</artifactId>
- <extensions>true</extensions>
- <executions>
- <execution>
- <id>compile-clojure</id>
- <phase>compile</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <id>test-clojure</id>
- <phase>test</phase>
- <goals>
- <goal>test-with-junit</goal>
- </goals>
- <configuration>
- <vmargs>${test.extra.args}</vmargs>
- </configuration>
- </execution>
- </executions>
- <configuration>
- <sourceDirectories>
- <sourceDirectory>src/clj</sourceDirectory>
- </sourceDirectories>
- <testSourceDirectories>
- <testSourceDirectory>test/clj</testSourceDirectory>
- </testSourceDirectories>
- <warnOnReflection>false</warnOnReflection>
- <copyDeclaredNamespaceOnly>true</copyDeclaredNamespaceOnly>
- <copiedNamespaces>
- <copiedNamespace>none</copiedNamespace>
- </copiedNamespaces>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-surefire-report-plugin</artifactId>
- <configuration>
- <reportsDirectories>
- <file>${project.build.directory}/test-reports</file>
- </reportsDirectories>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.2</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- </execution>
- </executions>
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>maven-shade-clojure-transformer</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
- <configuration>
- <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
- <promoteTransitiveDependencies>false</promoteTransitiveDependencies>
- <createDependencyReducedPom>true</createDependencyReducedPom>
- <minimizeJar>false</minimizeJar>
- <artifactSet>
- <includes>
- <include>org.apache.thrift:*</include>
- <include>org.apache.storm:*</include>
- </includes>
- </artifactSet>
- <relocations>
- <relocation>
- <pattern>org.apache.thrift</pattern>
- <shadedPattern>org.apache.thrift7</shadedPattern>
- </relocation>
- </relocations>
- <transformers>
- <transformer />
- </transformers>
- <filters>
- <filter>
- <artifact>org.apache.thrift:*</artifact>
- <excludes>
- <exclude>META-INF/LICENSE.txt</exclude>
- <exclude>META-INF/NOTICE.txt</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>org.clojure</groupId>
- <artifactId>clojure</artifactId>
- <version>1.5.1</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>clj-time</groupId>
- <artifactId>clj-time</artifactId>
- <version>0.4.1</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>compojure</groupId>
- <artifactId>compojure</artifactId>
- <version>1.1.3</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>hiccup</groupId>
- <artifactId>hiccup</artifactId>
- <version>0.3.6</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>ring</groupId>
- <artifactId>ring-devel</artifactId>
- <version>0.3.11</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>ring</groupId>
- <artifactId>ring-jetty-adapter</artifactId>
- <version>0.3.11</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.clojure</groupId>
- <artifactId>tools.logging</artifactId>
- <version>0.2.3</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.clojure</groupId>
- <artifactId>math.numeric-tower</artifactId>
- <version>0.0.1</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.clojure</groupId>
- <artifactId>tools.cli</artifactId>
- <version>0.2.4</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.clojure</groupId>
- <artifactId>tools.nrepl</artifactId>
- <version>0.2.3</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <artifactId>clojure</artifactId>
- <groupId>org.clojure</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>clojure-complete</groupId>
- <artifactId>clojure-complete</artifactId>
- <version>0.2.3</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <artifactId>clojure</artifactId>
- <groupId>org.clojure</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>2.4</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-exec</artifactId>
- <version>1.1</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <version>2.5</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.7.0</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-api</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>2.4.0</version>
- <scope>compile</scope>
- <exclusions>
- <exclusion>
- <artifactId>log4j</artifactId>
- <groupId>log4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.googlecode.json-simple</groupId>
- <artifactId>json-simple</artifactId>
- <version>1.1</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>carbonite</artifactId>
- <version>1.4.0</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.yaml</groupId>
- <artifactId>snakeyaml</artifactId>
- <version>1.11</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>4.3.3</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>com.googlecode.disruptor</groupId>
- <artifactId>disruptor</artifactId>
- <version>2.10.1</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.jgrapht</groupId>
- <artifactId>jgrapht-core</artifactId>
- <version>0.9.0</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>13.0</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <version>1.0.6</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- <version>1.6.6</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- <version>3.6.3.Final</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <version>1.9.5</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.clojars.runa</groupId>
- <artifactId>conjure</artifactId>
- <version>2.1.3</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>reply</groupId>
- <artifactId>reply</artifactId>
- <version>0.3.0</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <artifactId>cd-client</artifactId>
- <groupId>org.thnetos</groupId>
- </exclusion>
- <exclusion>
- <artifactId>drawbridge</artifactId>
- <groupId>com.cemerick</groupId>
- </exclusion>
- <exclusion>
- <artifactId>versioneer</artifactId>
- <groupId>trptcolin</groupId>
- </exclusion>
- <exclusion>
- <artifactId>sjacket</artifactId>
- <groupId>org.clojars.trptcolin</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-</project>
-
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 91cd370..134eeb8 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -20,11 +20,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
-<<<<<<< HEAD
- <version>0.9.2-incubating-mt0000</version>
-=======
<version>0.9.3-incubating-SNAPSHOT</version>
->>>>>>> upstream/master
</parent>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 64e60be..1bbe53d 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -18,7 +18,6 @@
(:use [backtype.storm bootstrap])
(:import [backtype.storm.hooks ITaskHook])
(:import [backtype.storm.tuple Tuple])
- (:import [backtype.storm.tuple MessageId])
(:import [backtype.storm.spout ISpoutWaitStrategy])
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
@@ -390,25 +389,12 @@
(let [^KryoTupleDeserializer deserializer (:deserializer executor-data)
task-ids (:task-ids executor-data)
debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
- component-id (:component-id executor-data)
- executor-id (:executor-id executor-data)
- executor-type (:type executor-data)
]
(disruptor/clojure-handler
(fn [tuple-batch sequence-id end-of-batch?]
(fast-list-iter [[task-id msg] tuple-batch]
- (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))
- tuple-streamid (.getSourceStreamId tuple)
- tuple-source (.getSourceComponent tuple)
- tuple-id (.getMessageId tuple)
- tuple-values (.getValues tuple)
- ]
- (when debug?
- (if (= tuple-streamid "default")
- (log-message "Component[" component-id "] Type[RECV] from Stream[" tuple-streamid "] Source[" tuple-source "] TupleId[" tuple-id "] TupleValue[" tuple-values "]")
- (log-message "Component[" component-id "] Type[RECV] from Stream[" tuple-streamid "] Source[" tuple-source "] TupleId[" tuple-values "]")
- )
- )
+ (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))]
+ (when debug? (log-message "Processing received message " tuple))
(if task-id
(tuple-action-fn task-id tuple)
;; null task ids are broadcast tuples
@@ -435,7 +421,6 @@
last-active (atom false)
spouts (ArrayList. (map :object (vals task-datas)))
rand (Random. (Utils/secureRandomLong))
- debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
pending (RotatingMap.
2 ;; microoptimize for performance of .size method
@@ -443,12 +428,9 @@
(expire [this msg-id [task-id spout-id tuple-info start-time-ms]]
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
(fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta)
- (when debug?
- (log-message "Component[" component-id "] FAILED-TUPLE reason[EXPIRED] TupleId[" msg-id "] values[" tuple-info "]"))
))))
tuple-action-fn (fn [task-id ^TupleImpl tuple]
- (let [stream-id (.getSourceStreamId tuple)
- tuple-id (.getMessageId tuple)]
+ (let [stream-id (.getSourceStreamId tuple)]
(condp = stream-id
Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
@@ -459,18 +441,10 @@
(throw-runtime "Fatal error, mismatched task ids: " task-id " " stored-task-id))
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
(condp = stream-id
- ACKER-ACK-STREAM-ID (do
- (ack-spout-msg executor-data (get task-datas task-id)
- spout-id tuple-finished-info time-delta)
- (when debug?
- (log-message "Component[" component-id "] ACK-TUPLE reason[RECV] TupleId[" id "] values[" tuple-finished-info "]"))
- )
- ACKER-FAIL-STREAM-ID (do
- (fail-spout-msg executor-data (get task-datas task-id)
- spout-id tuple-finished-info time-delta)
- (when debug?
- (log-message "Component[" component-id "] FAILED-TUPLE reason[RECV] TupleId[" id "] values[" tuple-finished-info "]"))
- )
+ ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
+ spout-id tuple-finished-info time-delta)
+ ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
+ spout-id tuple-finished-info time-delta)
)))
;; TODO: on failure, emit tuple to failure stream
))))
@@ -519,8 +493,6 @@
(transfer-fn out-task
out-tuple
overflow-buffer)
- (when debug?
- (log-message "Component[" component-id "] Type[EMIT] to Stream[" out-stream-id "] TupleId[" tuple-id "] values[" values "]"))
))
(if rooted?
(do
@@ -626,8 +598,6 @@
{:keys [storm-conf component-id worker-context transfer-fn report-error sampler
open-or-prepare-was-called?]} executor-data
rand (Random. (Utils/secureRandomLong))
- debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
-
tuple-action-fn (fn [task-id ^TupleImpl tuple]
;; synchronization needs to be done with a key provided by this bolt, otherwise:
;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
@@ -690,13 +660,7 @@
(tasks-fn task stream values)
(tasks-fn stream values))]
(fast-list-iter [t out-tasks]
- (let [anchors-to-ids (HashMap.)
- out-tuple (TupleImpl. worker-context
- values
- task-id
- stream
- (MessageId/makeId anchors-to-ids))
- ]
+ (let [anchors-to-ids (HashMap.)]
(fast-list-iter [^TupleImpl a anchors]
(let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
(when (pos? (count root-ids))
@@ -705,15 +669,12 @@
(fast-list-iter [root-id root-ids]
(put-xor! anchors-to-ids root-id edge-id))
))))
- (transfer-fn t out-tuple)
- (when debug?
- (if (= component-id "__acker")
- (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" (.get values 0) "]")
- (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" (.getMessageId out-tuple) "] values[" values "]")
- )
-
- )
- ))
+ (transfer-fn t
+ (TupleImpl. worker-context
+ values
+ task-id
+ stream
+ (MessageId/makeId anchors-to-ids)))))
(or out-tasks [])))]]
(builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
(if (= component-id Constants/SYSTEM_COMPONENT_ID)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 53b2802..7566a79 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -473,11 +473,8 @@
topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)]
(substitute-worker-childopts s port))
logfilename (str "worker-" port ".log")
- worker-childcgroup (when-let [s (conf WORKER-CHILDCGROUP)]
- (.split s " "))
command (concat
- worker-childcgroup
- [(java-cmd) "-server"]
+ [(java-cmd) "-server"]
worker-childopts
topo-worker-childopts
[(str "-Djava.library.path=" jlp)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/clj/backtype/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/task.clj b/storm-core/src/clj/backtype/storm/daemon/task.clj
index 29756a1..3650150 100644
--- a/storm-core/src/clj/backtype/storm/daemon/task.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/task.clj
@@ -131,7 +131,7 @@
(fn ([^Integer out-task-id ^String stream ^List values]
(when debug?
- (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" values "]"))
+ (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
(let [target-component (.getComponentId worker-context out-task-id)
component->grouping (get stream->component->grouper stream)
grouping (get component->grouping target-component)
@@ -149,7 +149,7 @@
))
([^String stream ^List values]
(when debug?
- (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" values "]"))
+ (log-message "Emitting: " component-id " " stream " " values))
(let [out-tasks (ArrayList.)]
(fast-map-iter [[out-component grouper] (get stream->component->grouper stream)]
(when (= :direct grouper)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
index 828606d..28b9202 100644
--- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
+++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
@@ -22,12 +22,7 @@
:implements [backtype.storm.scheduler.IScheduler]))
(defn sort-slots [all-slots]
- (let [split-up
- (map second
- (reverse
- (sort
- (for [[host ports] (group-by first all-slots)]
- [(count ports) ports]))))]
+ (let [split-up (vals (group-by first all-slots))]
(apply interleave-all split-up)
))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 4098038..5f2bcba 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -780,43 +780,43 @@
(let [id (url-decode id)
component (url-decode component)]
(json-response (component-page id component (:window m) (check-include-sys? (:sys m))))))
- ;(POST "/api/v1/topology/:id/activate" [id]
- ; (with-nimbus nimbus
- ; (let [id (url-decode id)
- ; tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
- ; name (.get_name tplg)]
- ; (.activate nimbus name)
- ; (log-message "Activating topology '" name "'")))
- ; (resp/redirect (str "/api/v1/topology/" id)))
-
- ;(POST "/api/v1/topology/:id/deactivate" [id]
- ; (with-nimbus nimbus
- ; (let [id (url-decode id)
- ; tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
- ; name (.get_name tplg)]
- ; (.deactivate nimbus name)
- ; (log-message "Deactivating topology '" name "'")))
- ; (resp/redirect (str "/api/v1/topology/" id)))
- ;(POST "/api/v1/topology/:id/rebalance/:wait-time" [id wait-time]
- ; (with-nimbus nimbus
- ; (let [id (url-decode id)
- ; tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
- ; name (.get_name tplg)
- ; options (RebalanceOptions.)]
- ; (.set_wait_secs options (Integer/parseInt wait-time))
- ; (.rebalance nimbus name options)
- ; (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
- ; (resp/redirect (str "/api/v1/topology/" id)))
- ;(POST "/api/v1/topology/:id/kill/:wait-time" [id wait-time]
- ; (with-nimbus nimbus
- ; (let [id (url-decode id)
- ; tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
- ; name (.get_name tplg)
- ; options (KillOptions.)]
- ; (.set_wait_secs options (Integer/parseInt wait-time))
- ; (.killTopologyWithOpts nimbus name options)
- ; (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
- ; (resp/redirect (str "/api/v1/topology/" id)))
+ (POST "/api/v1/topology/:id/activate" [id]
+ (with-nimbus nimbus
+ (let [id (url-decode id)
+ tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ name (.get_name tplg)]
+ (.activate nimbus name)
+ (log-message "Activating topology '" name "'")))
+ (resp/redirect (str "/api/v1/topology/" id)))
+
+ (POST "/api/v1/topology/:id/deactivate" [id]
+ (with-nimbus nimbus
+ (let [id (url-decode id)
+ tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ name (.get_name tplg)]
+ (.deactivate nimbus name)
+ (log-message "Deactivating topology '" name "'")))
+ (resp/redirect (str "/api/v1/topology/" id)))
+ (POST "/api/v1/topology/:id/rebalance/:wait-time" [id wait-time]
+ (with-nimbus nimbus
+ (let [id (url-decode id)
+ tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ name (.get_name tplg)
+ options (RebalanceOptions.)]
+ (.set_wait_secs options (Integer/parseInt wait-time))
+ (.rebalance nimbus name options)
+ (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
+ (resp/redirect (str "/api/v1/topology/" id)))
+ (POST "/api/v1/topology/:id/kill/:wait-time" [id wait-time]
+ (with-nimbus nimbus
+ (let [id (url-decode id)
+ tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ name (.get_name tplg)
+ options (KillOptions.)]
+ (.set_wait_secs options (Integer/parseInt wait-time))
+ (.killTopologyWithOpts nimbus name options)
+ (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
+ (resp/redirect (str "/api/v1/topology/" id)))
(GET "/" [:as {cookies :cookies}]
(resp/redirect "/index.html"))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 531fa14..ff309a5 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -475,12 +475,6 @@ public class Config extends HashMap<String, Object> {
public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
/**
- * The cgroup opts provided to workers launched by this supervisor.
- */
- public static final String WORKER_CHILDCGROUP = "worker.childcgroup";
- public static final Object WORKER_CHILDCGROUP_SCHEMA = String.class;
-
- /**
* control how many worker receiver threads we need per worker
*/
public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
index a4c8c2c..9965c81 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -33,12 +33,10 @@ json_decode = lambda x: json.loads(x)
def readMsg():
msg = ""
while True:
- line = sys.stdin.readline()
- if not line:
- raise Exception('Read EOF from stdin')
- if line[0:-1] == "end":
+ line = sys.stdin.readline()[0:-1]
+ if line == "end":
break
- msg = msg + line
+ msg = msg + line + "\n"
return json_decode(msg[0:-1])
MODE = None
@@ -137,7 +135,7 @@ def reportError(msg):
def log(msg):
sendMsgToParent({"command": "log", "msg": msg})
-
+
def rpcMetrics(name, params):
sendMsgToParent({"command": "metrics", "name": name, "params": params})
@@ -182,13 +180,6 @@ class BasicBolt(object):
def initialize(self, stormconf, context):
pass
- def redirect_stdout_to_stderr(self):
- self.bakup_stdout = sys.stdout
- sys.stdout = sys.stderr
-
- def recover_stdout(self):
- sys.stdout = self.bakup_stdout
-
def process(self, tuple):
pass
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/ui/public/topology.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html
index 2ebab42..1020719 100644
--- a/storm-core/src/ui/public/topology.html
+++ b/storm-core/src/ui/public/topology.html
@@ -36,13 +36,11 @@
<h2>Topology summary</h2>
<div id="topology-summary">
</div>
-<!--
<div id="topology-actions">
<h2 class="js-only">Topology actions</h2>
<p id="topology-actions" class="js-only">
</p>
</div>
--->
<div id="topology-stats"></div>
<div id="spout-stats">
</div>
@@ -79,13 +77,13 @@ $(document).ready(function() {
var spoutStats = $("#spout-stats");
var boltStats = $("#bolt-stats");
var config = $("#topology-configuration");
- //var topologyActions = $("#topology-actions");
+ var topologyActions = $("#topology-actions");
var topologyVisualization = $("#topology-visualization")
var formattedConfig = formatConfigData(response["configuration"]);
var buttonJsonData = topologyActionJson(response["id"],response["name"],response["status"],response["msgTimeout"]);
$.get("/templates/topology-page-template.html", function(template) {
topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response));
- //topologyActions.append(Mustache.render($(template).filter("#topology-actions-template").html(),buttonJsonData));
+ topologyActions.append(Mustache.render($(template).filter("#topology-actions-template").html(),buttonJsonData));
topologyStats.append(Mustache.render($(template).filter("#topology-stats-template").html(),response));
$("#topology-stats-table").tablesorter({ sortList: [[0,0]], headers: {0: { sorter: "stormtimestr"}}});
spoutStats.append(Mustache.render($(template).filter("#spout-stats-template").html(),response));
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-dist/binary/pom.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/pom.xml b/storm-dist/binary/pom.xml
index 464ecd7..0d97c0b 100644
--- a/storm-dist/binary/pom.xml
+++ b/storm-dist/binary/pom.xml
@@ -21,11 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
-<<<<<<< HEAD
- <version>0.9.2-incubating-mt0000</version>
-=======
<version>0.9.3-incubating-SNAPSHOT</version>
->>>>>>> upstream/master
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.storm</groupId>
[07/24] git commit: merge 0.9.2 incubating
Posted by bo...@apache.org.
merge 0.9.2 incubating
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/bc391722
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/bc391722
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/bc391722
Branch: refs/heads/master
Commit: bc391722098c1e66f6ebe0377a3de6b51a7cf38c
Parents: e2a7f7c c89fb82
Author: JuDasheng <ju...@meituan.com>
Authored: Tue Jun 10 17:43:29 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Tue Jun 10 17:43:29 2014 +0800
----------------------------------------------------------------------
.gitignore | 4 +-
CHANGELOG.md | 64 +
DEVELOPER.md | 313 +++++
NOTICE | 6 +-
README.markdown | 12 +-
SECURITY.md | 74 ++
bin/storm | 59 +-
bin/storm-config.cmd | 27 +-
bin/storm.cmd | 59 +-
conf/defaults.yaml | 11 +-
conf/storm.yaml.example | 2 +-
conf/storm_env.ini | 30 +
examples/storm-starter/README.markdown | 127 ++
.../multilang/resources/splitsentence.py | 24 +
.../multilang/resources/splitsentence.rb | 26 +
.../storm-starter/multilang/resources/storm.py | 221 ++++
.../storm-starter/multilang/resources/storm.rb | 200 +++
examples/storm-starter/pom.xml | 166 +++
.../src/clj/storm/starter/clj/word_count.clj | 95 ++
.../jvm/storm/starter/BasicDRPCTopology.java | 78 ++
.../jvm/storm/starter/ExclamationTopology.java | 87 ++
.../src/jvm/storm/starter/ManualDRPC.java | 68 +
.../jvm/storm/starter/PrintSampleStream.java | 58 +
.../src/jvm/storm/starter/ReachTopology.java | 196 +++
.../src/jvm/storm/starter/RollingTopWords.java | 78 ++
.../jvm/storm/starter/SingleJoinExample.java | 64 +
.../storm/starter/TransactionalGlobalCount.java | 173 +++
.../jvm/storm/starter/TransactionalWords.java | 246 ++++
.../jvm/storm/starter/WordCountTopology.java | 107 ++
.../storm/starter/bolt/AbstractRankerBolt.java | 110 ++
.../starter/bolt/IntermediateRankingsBolt.java | 58 +
.../src/jvm/storm/starter/bolt/PrinterBolt.java | 37 +
.../storm/starter/bolt/RollingCountBolt.java | 142 ++
.../jvm/storm/starter/bolt/SingleJoinBolt.java | 114 ++
.../storm/starter/bolt/TotalRankingsBolt.java | 59 +
.../starter/spout/RandomSentenceSpout.java | 64 +
.../storm/starter/spout/TwitterSampleSpout.java | 164 +++
.../tools/NthLastModifiedTimeTracker.java | 70 +
.../src/jvm/storm/starter/tools/Rankable.java | 32 +
.../starter/tools/RankableObjectWithFields.java | 148 +++
.../src/jvm/storm/starter/tools/Rankings.java | 156 +++
.../starter/tools/SlidingWindowCounter.java | 119 ++
.../storm/starter/tools/SlotBasedCounter.java | 118 ++
.../jvm/storm/starter/trident/TridentReach.java | 156 +++
.../storm/starter/trident/TridentWordCount.java | 85 ++
.../src/jvm/storm/starter/util/StormRunner.java | 39 +
.../jvm/storm/starter/util/TupleHelpers.java | 33 +
.../bolt/IntermediateRankingsBoltTest.java | 146 +++
.../starter/bolt/RollingCountBoltTest.java | 113 ++
.../starter/bolt/TotalRankingsBoltTest.java | 147 +++
.../storm/starter/tools/MockTupleHelpers.java | 40 +
.../tools/NthLastModifiedTimeTrackerTest.java | 125 ++
.../tools/RankableObjectWithFieldsTest.java | 252 ++++
.../jvm/storm/starter/tools/RankingsTest.java | 368 ++++++
.../starter/tools/SlidingWindowCounterTest.java | 106 ++
.../starter/tools/SlotBasedCounterTest.java | 181 +++
external/storm-kafka/CHANGELOG.md | 13 +
external/storm-kafka/README.md | 28 +
external/storm-kafka/pom.xml | 138 ++
.../storm-kafka/src/jvm/storm/kafka/Broker.java | 81 ++
.../src/jvm/storm/kafka/BrokerHosts.java | 25 +
.../jvm/storm/kafka/DynamicBrokersReader.java | 147 +++
.../kafka/DynamicPartitionConnections.java | 94 ++
.../jvm/storm/kafka/FailedFetchException.java | 29 +
.../src/jvm/storm/kafka/KafkaConfig.java | 52 +
.../src/jvm/storm/kafka/KafkaError.java | 43 +
.../src/jvm/storm/kafka/KafkaSpout.java | 195 +++
.../src/jvm/storm/kafka/KafkaUtils.java | 243 ++++
.../src/jvm/storm/kafka/KeyValueScheme.java | 28 +
.../kafka/KeyValueSchemeAsMultiScheme.java | 37 +
.../src/jvm/storm/kafka/Partition.java | 64 +
.../jvm/storm/kafka/PartitionCoordinator.java | 28 +
.../src/jvm/storm/kafka/PartitionManager.java | 265 ++++
.../src/jvm/storm/kafka/SpoutConfig.java | 36 +
.../src/jvm/storm/kafka/StaticCoordinator.java | 51 +
.../src/jvm/storm/kafka/StaticHosts.java | 38 +
.../storm/kafka/StaticPartitionConnections.java | 52 +
.../jvm/storm/kafka/StringKeyValueScheme.java | 37 +
.../src/jvm/storm/kafka/StringScheme.java | 46 +
.../src/jvm/storm/kafka/ZkCoordinator.java | 113 ++
.../src/jvm/storm/kafka/ZkHosts.java | 36 +
.../src/jvm/storm/kafka/ZkState.java | 116 ++
.../src/jvm/storm/kafka/bolt/KafkaBolt.java | 89 ++
.../jvm/storm/kafka/trident/Coordinator.java | 50 +
.../storm/kafka/trident/DefaultCoordinator.java | 31 +
.../trident/GlobalPartitionInformation.java | 98 ++
.../storm/kafka/trident/IBatchCoordinator.java | 26 +
.../jvm/storm/kafka/trident/IBrokerReader.java | 25 +
.../src/jvm/storm/kafka/trident/MaxMetric.java | 40 +
.../kafka/trident/OpaqueTridentKafkaSpout.java | 59 +
.../storm/kafka/trident/StaticBrokerReader.java | 36 +
.../trident/TransactionalTridentKafkaSpout.java | 58 +
.../storm/kafka/trident/TridentKafkaConfig.java | 37 +
.../kafka/trident/TridentKafkaEmitter.java | 268 ++++
.../jvm/storm/kafka/trident/ZkBrokerReader.java | 70 +
.../storm/kafka/DynamicBrokersReaderTest.java | 155 +++
.../src/test/storm/kafka/KafkaErrorTest.java | 39 +
.../src/test/storm/kafka/KafkaTestBroker.java | 75 ++
.../src/test/storm/kafka/KafkaUtilsTest.java | 225 ++++
.../storm/kafka/StringKeyValueSchemeTest.java | 38 +
.../src/test/storm/kafka/TestUtils.java | 20 +
.../src/test/storm/kafka/ZkCoordinatorTest.java | 130 ++
.../test/storm/kafka/bolt/KafkaBoltTest.java | 171 +++
logback/cluster.xml | 2 +-
pom.xml | 69 +-
storm-core/pom.xml | 49 +-
.../clj/backtype/storm/command/rebalance.clj | 7 +-
storm-core/src/clj/backtype/storm/config.clj | 2 +-
.../src/clj/backtype/storm/daemon/common.clj | 6 +-
.../src/clj/backtype/storm/daemon/drpc.clj | 17 +-
.../src/clj/backtype/storm/daemon/executor.clj | 43 +-
.../src/clj/backtype/storm/daemon/logviewer.clj | 41 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 9 -
.../clj/backtype/storm/daemon/supervisor.clj | 66 +-
.../src/clj/backtype/storm/daemon/worker.clj | 76 +-
storm-core/src/clj/backtype/storm/disruptor.clj | 10 +-
.../src/clj/backtype/storm/messaging/loader.clj | 81 +-
.../src/clj/backtype/storm/messaging/local.clj | 20 +-
storm-core/src/clj/backtype/storm/testing.clj | 5 +-
storm-core/src/clj/backtype/storm/timer.clj | 7 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 1209 +++++++-----------
.../src/clj/backtype/storm/ui/helpers.clj | 5 +-
storm-core/src/clj/backtype/storm/util.clj | 17 +-
storm-core/src/clj/backtype/storm/zookeeper.clj | 10 +-
storm-core/src/jvm/backtype/storm/Config.java | 38 +-
.../jvm/backtype/storm/ConfigValidation.java | 17 +
.../src/jvm/backtype/storm/StormSubmitter.java | 176 ++-
.../backtype/storm/messaging/IConnection.java | 15 +-
.../backtype/storm/messaging/netty/Client.java | 333 +++--
.../backtype/storm/messaging/netty/Context.java | 57 +-
.../storm/messaging/netty/ControlMessage.java | 6 +-
.../storm/messaging/netty/MessageBatch.java | 52 +-
.../storm/messaging/netty/MessageDecoder.java | 108 +-
.../netty/NettyRenameThreadFactory.java | 35 +
.../backtype/storm/messaging/netty/Server.java | 147 ++-
.../netty/StormClientErrorHandler.java | 41 +
.../messaging/netty/StormClientHandler.java | 121 --
.../netty/StormClientPipelineFactory.java | 2 +-
.../messaging/netty/StormServerHandler.java | 40 +-
.../jvm/backtype/storm/multilang/BoltMsg.java | 63 +
.../backtype/storm/multilang/ISerializer.java | 65 +
.../storm/multilang/JsonSerializer.java | 162 +++
.../storm/multilang/NoOutputException.java | 23 +
.../jvm/backtype/storm/multilang/ShellMsg.java | 105 ++
.../jvm/backtype/storm/multilang/SpoutMsg.java | 34 +
.../jvm/backtype/storm/spout/ShellSpout.java | 86 +-
.../src/jvm/backtype/storm/task/ShellBolt.java | 115 +-
.../backtype/storm/task/TopologyContext.java | 9 +
.../storm/testing/TestEventLogSpout.java | 139 ++
.../storm/testing/TestEventOrderCheckBolt.java | 76 ++
.../transactional/state/TransactionalState.java | 2 +-
.../backtype/storm/utils/DisruptorQueue.java | 9 +-
.../jvm/backtype/storm/utils/ShellProcess.java | 169 ++-
.../src/jvm/backtype/storm/utils/Time.java | 18 +-
.../backtype/storm/utils/TransferDrainer.java | 113 ++
.../src/jvm/backtype/storm/utils/Utils.java | 117 +-
storm-core/src/jvm/storm/trident/Stream.java | 5 +-
.../trident/spout/RichSpoutBatchTriggerer.java | 1 +
.../jvm/storm/trident/state/map/OpaqueMap.java | 6 +-
.../trident/state/map/RemovableMapState.java | 8 +
.../storm/trident/testing/MemoryMapState.java | 27 +-
.../topology/state/TransactionalState.java | 2 +-
.../jvm/storm/trident/util/TridentUtils.java | 14 +-
storm-core/src/ui/public/component.html | 88 ++
storm-core/src/ui/public/index.html | 73 ++
storm-core/src/ui/public/js/arbor-graphics.js | 51 +
storm-core/src/ui/public/js/arbor-tween.js | 86 ++
storm-core/src/ui/public/js/arbor.js | 67 +
storm-core/src/ui/public/js/jquery.mustache.js | 592 +++++++++
storm-core/src/ui/public/js/purl.js | 267 ++++
storm-core/src/ui/public/js/script.js | 51 +-
storm-core/src/ui/public/js/visualization.js | 403 ++++++
.../templates/component-page-template.html | 152 +++
.../public/templates/index-page-template.html | 62 +
.../public/templates/json-error-template.html | 4 +
.../templates/topology-page-template.html | 128 ++
storm-core/src/ui/public/topology.html | 90 ++
.../test/clj/backtype/storm/config_test.clj | 20 +
.../test/clj/backtype/storm/drpc_test.clj | 27 +-
.../clj/backtype/storm/integration_test.clj | 9 +-
.../storm/messaging/netty_unit_test.clj | 46 +-
.../test/clj/backtype/storm/messaging_test.clj | 35 +-
.../test/clj/backtype/storm/metrics_test.clj | 24 +
.../test/clj/backtype/storm/multilang_test.clj | 6 +-
.../test/clj/backtype/storm/nimbus_test.clj | 16 +-
.../test/clj/backtype/storm/supervisor_test.clj | 66 +-
.../test/clj/backtype/storm/utils_test.clj | 13 +-
.../test/clj/storm/trident/integration_test.clj | 2 +-
.../test/clj/storm/trident/state_test.clj | 33 +-
storm-dist/binary/NOTICE | 9 +-
storm-dist/binary/pom.xml | 40 +-
storm-dist/binary/src/main/assembly/binary.xml | 41 +
storm-dist/source/pom.xml | 38 +-
193 files changed, 14785 insertions(+), 1693 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/bc391722/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index f54cf72,d6a18e7..368f16a
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@@ -18,18 -18,15 +18,18 @@@
package backtype.storm.spout;
import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.rpc.IShellMetric;
+ import backtype.storm.multilang.ShellMsg;
+ import backtype.storm.multilang.SpoutMsg;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.ShellProcess;
import java.util.Map;
import java.util.List;
import java.io.IOException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
- import org.json.simple.JSONObject;
public class ShellSpout implements ISpout {
@@@ -38,9 -35,8 +38,11 @@@
private SpoutOutputCollector _collector;
private String[] _command;
private ShellProcess _process;
+
+ private TopologyContext _context;
+
+ private SpoutMsg _spoutMsg;
+
public ShellSpout(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
}
@@@ -48,118 -44,75 +50,110 @@@
public ShellSpout(String... command) {
_command = command;
}
-
+
public void open(Map stormConf, TopologyContext context,
SpoutOutputCollector collector) {
- _process = new ShellProcess(_command);
_collector = collector;
+ _context = context;
- try {
- Number subpid = _process.launch(stormConf, context);
- LOG.info("Launched subprocess with pid " + subpid);
- } catch (IOException e) {
- throw new RuntimeException("Error when launching multilang subprocess\n" + _process.getErrorsString(), e);
- }
+ _process = new ShellProcess(_command);
+
+ Number subpid = _process.launch(stormConf, context);
+ LOG.info("Launched subprocess with pid " + subpid);
}
public void close() {
_process.destroy();
}
- private JSONObject _next;
public void nextTuple() {
- if (_next == null) {
- _next = new JSONObject();
- _next.put("command", "next");
+ if (_spoutMsg == null) {
+ _spoutMsg = new SpoutMsg();
}
-
- querySubprocess(_next);
+ _spoutMsg.setCommand("next");
+ _spoutMsg.setId("");
+ querySubprocess();
}
- private JSONObject _ack;
public void ack(Object msgId) {
- if (_ack == null) {
- _ack = new JSONObject();
- _ack.put("command", "ack");
+ if (_spoutMsg == null) {
+ _spoutMsg = new SpoutMsg();
}
-
- _ack.put("id", msgId);
- querySubprocess(_ack);
+ _spoutMsg.setCommand("ack");
+ _spoutMsg.setId(msgId);
+ querySubprocess();
}
- private JSONObject _fail;
public void fail(Object msgId) {
- if (_fail == null) {
- _fail = new JSONObject();
- _fail.put("command", "fail");
+ if (_spoutMsg == null) {
+ _spoutMsg = new SpoutMsg();
}
-
- _fail.put("id", msgId);
- querySubprocess(_fail);
+ _spoutMsg.setCommand("fail");
+ _spoutMsg.setId(msgId);
+ querySubprocess();
}
+
+ private void handleMetrics(Map action) {
+ //get metric name
+ Object nameObj = action.get("name");
+ if (nameObj == null || !(nameObj instanceof String) ) {
+ throw new RuntimeException("Receive Metrics name is null or is not String");
+ }
+ String name = (String) nameObj;
+ if (name.isEmpty()) {
+ throw new RuntimeException("Receive Metrics name is empty");
+ }
+
+ //get metric by name
+ IMetric iMetric = _context.getRegisteredMetricByName(name);
+ if (iMetric == null) {
+ throw new RuntimeException("Not find metric by name["+name+"] ");
+ }
+ if ( !(iMetric instanceof IShellMetric)) {
+ throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+ }
+ IShellMetric iShellMetric = (IShellMetric)iMetric;
+
+ //call updateMetricFromRPC with params
+ Object paramsObj = action.get("params");
+ try {
+ iShellMetric.updateMetricFromRPC(paramsObj);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
- private void querySubprocess(Object query) {
+ private void querySubprocess() {
try {
- _process.writeMessage(query);
+ _process.writeSpoutMsg(_spoutMsg);
while (true) {
- JSONObject action = _process.readMessage();
- String command = (String) action.get("command");
+ ShellMsg shellMsg = _process.readShellMsg();
+ String command = shellMsg.getCommand();
if (command.equals("sync")) {
return;
} else if (command.equals("log")) {
- String msg = (String) action.get("msg");
+ String msg = shellMsg.getMsg();
LOG.info("Shell msg: " + msg);
} else if (command.equals("emit")) {
- String stream = (String) action.get("stream");
- if (stream == null) stream = Utils.DEFAULT_STREAM_ID;
- Long task = (Long) action.get("task");
- List<Object> tuple = (List) action.get("tuple");
- Object messageId = (Object) action.get("id");
- if (task == null) {
+ String stream = shellMsg.getStream();
+ Long task = shellMsg.getTask();
+ List<Object> tuple = shellMsg.getTuple();
+ Object messageId = shellMsg.getId();
+ if (task == 0) {
List<Integer> outtasks = _collector.emit(stream, tuple, messageId);
- Object need_task_ids = action.get("need_task_ids");
- if (need_task_ids == null || ((Boolean) need_task_ids).booleanValue()) {
- _process.writeMessage(outtasks);
+ if (shellMsg.areTaskIdsNeeded()) {
+ _process.writeTaskIds(outtasks);
}
} else {
- _collector.emitDirect((int)task.longValue(), stream, tuple, messageId);
+ _collector.emitDirect((int) task.longValue(), stream, tuple, messageId);
}
+ } else if (command.equals("metrics")) {
+ handleMetrics(action);
+ } else {
+ throw new RuntimeException("Unknown command received: " + command);
}
}
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/bc391722/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index de3c3f2,81aca02..50d6e20
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@@ -19,15 -19,13 +19,16 @@@ package backtype.storm.task
import backtype.storm.Config;
import backtype.storm.generated.ShellComponent;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.rpc.IShellMetric;
+import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
- import backtype.storm.utils.Utils;
import backtype.storm.utils.ShellProcess;
- import java.io.IOException;
+ import backtype.storm.multilang.BoltMsg;
+ import backtype.storm.multilang.ShellMsg;
+
import java.util.ArrayList;
- import java.util.Arrays;
+ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import static java.util.concurrent.TimeUnit.SECONDS;
@@@ -77,11 -73,9 +76,11 @@@ public class ShellBolt implements IBol
private volatile Throwable _exception;
private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();
private Random _rand;
-
+
private Thread _readerThread;
private Thread _writerThread;
+
+ private TopologyContext _context;
public ShellBolt(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
@@@ -98,42 -92,32 +97,42 @@@
this._pendingWrites = new LinkedBlockingQueue(((Number)maxPending).intValue());
}
_rand = new Random();
- _process = new ShellProcess(_command);
_collector = collector;
++<<<<<<< HEAD
+ _context = context;
++=======
+ _process = new ShellProcess(_command);
++>>>>>>> upstream/master
- try {
- //subprocesses must send their pid first thing
- Number subpid = _process.launch(stormConf, context);
- LOG.info("Launched subprocess with pid " + subpid);
- } catch (IOException e) {
- throw new RuntimeException("Error when launching multilang subprocess\n" + _process.getErrorsString(), e);
- }
+ //subprocesses must send their pid first thing
+ Number subpid = _process.launch(stormConf, context);
+ LOG.info("Launched subprocess with pid " + subpid);
// reader
_readerThread = new Thread(new Runnable() {
public void run() {
while (_running) {
try {
- JSONObject action = _process.readMessage();
- if (action == null) {
- // ignore sync
- }
+ ShellMsg shellMsg = _process.readShellMsg();
- String command = (String) action.get("command");
+ String command = shellMsg.getCommand();
if(command.equals("ack")) {
- handleAck(action);
+ handleAck(shellMsg.getId());
} else if (command.equals("fail")) {
- handleFail(action);
+ handleFail(shellMsg.getId());
} else if (command.equals("error")) {
- handleError(action);
+ handleError(shellMsg.getMsg());
} else if (command.equals("log")) {
- String msg = (String) action.get("msg");
+ String msg = shellMsg.getMsg();
LOG.info("Shell msg: " + msg);
} else if (command.equals("emit")) {
++<<<<<<< HEAD
+ handleEmit(action);
+ } else if (command.equals("metrics")) {
+ handleMetrics(action);
++=======
+ handleEmit(shellMsg);
++>>>>>>> upstream/master
}
} catch (InterruptedException e) {
} catch (Throwable t) {
@@@ -192,9 -179,8 +194,14 @@@
_process.destroy();
_inputs.clear();
}
++<<<<<<< HEAD
+
+ private void handleAck(Map action) {
+ String id = (String) action.get("id");
++=======
+
+ private void handleAck(Object id) {
++>>>>>>> upstream/master
Tuple acked = _inputs.remove(id);
if(acked==null) {
throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id);
@@@ -242,41 -219,10 +240,42 @@@
_pendingWrites.put(outtasks);
}
} else {
- _collector.emitDirect((int)task.longValue(), stream, anchors, tuple);
+ _collector.emitDirect((int) shellMsg.getTask(),
+ shellMsg.getStream(), anchors, shellMsg.getTuple());
}
}
+
+ private void handleMetrics(Map action) {
+ //get metric name
+ Object nameObj = action.get("name");
+ if (nameObj == null || !(nameObj instanceof String) ) {
+ throw new RuntimeException("Receive Metrics name is null or is not String");
+ }
+ String name = (String) nameObj;
+ if (name.isEmpty()) {
+ throw new RuntimeException("Receive Metrics name is empty");
+ }
+
+ //get metric by name
+ IMetric iMetric = _context.getRegisteredMetricByName(name);
+ if (iMetric == null) {
+ throw new RuntimeException("Not find metric by name["+name+"] ");
+ }
+ if ( !(iMetric instanceof IShellMetric)) {
+ throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
+ }
+ IShellMetric iShellMetric = (IShellMetric)iMetric;
+
+ //call updateMetricFromRPC with params
+ Object paramsObj = action.get("params");
+ try {
+ iShellMetric.updateMetricFromRPC(paramsObj);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
private void die(Throwable exception) {
_exception = exception;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/bc391722/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
[14/24] git commit: merge apache-storm master
Posted by bo...@apache.org.
merge apache-storm master
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/22a6ca9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/22a6ca9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/22a6ca9b
Branch: refs/heads/master
Commit: 22a6ca9b94cf68709c16436acc7ad4ac6c7c9d7a
Parents: 4d6a27f 2a60e99
Author: JuDasheng <ju...@meituan.com>
Authored: Thu Jun 12 18:08:02 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Thu Jun 12 18:08:02 2014 +0800
----------------------------------------------------------------------
CHANGELOG.md | 2 +-
LICENSE | 124 ++++++-
examples/storm-starter/pom.xml | 5 +-
external/storm-kafka/README.md | 33 +-
external/storm-kafka/pom.xml | 27 +-
.../storm/kafka/DynamicBrokersReaderTest.java | 17 +
.../src/test/storm/kafka/KafkaErrorTest.java | 17 +
.../src/test/storm/kafka/KafkaTestBroker.java | 17 +
.../src/test/storm/kafka/KafkaUtilsTest.java | 17 +
.../storm/kafka/StringKeyValueSchemeTest.java | 17 +
.../src/test/storm/kafka/TestUtils.java | 17 +
.../src/test/storm/kafka/ZkCoordinatorTest.java | 17 +
.../test/storm/kafka/bolt/KafkaBoltTest.java | 17 +
pom.xml | 40 ++-
storm-core/dependency-reduced-pom.xml | 359 +++++++++++++++++++
storm-core/pom.xml | 4 +
.../netty/NettyRenameThreadFactory.java | 17 +
.../jvm/backtype/storm/multilang/BoltMsg.java | 17 +
.../backtype/storm/multilang/ISerializer.java | 17 +
.../storm/multilang/JsonSerializer.java | 17 +
.../storm/multilang/NoOutputException.java | 17 +
.../jvm/backtype/storm/multilang/ShellMsg.java | 17 +
.../jvm/backtype/storm/multilang/SpoutMsg.java | 17 +
.../trident/state/map/RemovableMapState.java | 17 +
storm-core/src/ui/public/component.html | 16 +
storm-core/src/ui/public/index.html | 16 +
storm-core/src/ui/public/js/visualization.js | 20 +-
.../templates/component-page-template.html | 17 +
.../public/templates/index-page-template.html | 16 +
.../public/templates/json-error-template.html | 16 +
.../templates/topology-page-template.html | 16 +
storm-core/src/ui/public/topology.html | 17 +
storm-dist/binary/LICENSE | 154 +++++++-
storm-dist/binary/NOTICE | 2 +-
storm-dist/binary/pom.xml | 4 +
storm-dist/source/pom.xml | 2 +-
36 files changed, 1122 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/examples/storm-starter/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/external/storm-kafka/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/storm-core/dependency-reduced-pom.xml
----------------------------------------------------------------------
diff --cc storm-core/dependency-reduced-pom.xml
index 0000000,0000000..9dacd73
new file mode 100644
--- /dev/null
+++ b/storm-core/dependency-reduced-pom.xml
@@@ -1,0 -1,0 +1,359 @@@
++<?xml version="1.0" encoding="UTF-8"?>
++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
++ <parent>
++ <artifactId>storm</artifactId>
++ <groupId>org.apache.storm</groupId>
++ <version>0.9.2-incubating-mt0000</version>
++ </parent>
++ <modelVersion>4.0.0</modelVersion>
++ <groupId>org.apache.storm</groupId>
++ <artifactId>storm-core</artifactId>
++ <name>Storm Core</name>
++ <description>Storm Core Java API and Clojure implementation.</description>
++ <build>
++ <sourceDirectory>src/jvm</sourceDirectory>
++ <resources>
++ <resource>
++ <directory>../conf</directory>
++ </resource>
++ <resource>
++ <targetPath>META-INF</targetPath>
++ <directory>../</directory>
++ <includes>
++ <include>NOTICE</include>
++ </includes>
++ </resource>
++ </resources>
++ <testResources>
++ <testResource>
++ <directory>src/dev</directory>
++ </testResource>
++ <testResource>
++ <directory>test/resources</directory>
++ </testResource>
++ </testResources>
++ <plugins>
++ <plugin>
++ <groupId>com.theoryinpractise</groupId>
++ <artifactId>clojure-maven-plugin</artifactId>
++ <extensions>true</extensions>
++ <executions>
++ <execution>
++ <id>compile-clojure</id>
++ <phase>compile</phase>
++ <goals>
++ <goal>compile</goal>
++ </goals>
++ </execution>
++ <execution>
++ <id>test-clojure</id>
++ <phase>test</phase>
++ <goals>
++ <goal>test-with-junit</goal>
++ </goals>
++ <configuration>
++ <vmargs>${test.extra.args}</vmargs>
++ </configuration>
++ </execution>
++ </executions>
++ <configuration>
++ <sourceDirectories>
++ <sourceDirectory>src/clj</sourceDirectory>
++ </sourceDirectories>
++ <testSourceDirectories>
++ <testSourceDirectory>test/clj</testSourceDirectory>
++ </testSourceDirectories>
++ <warnOnReflection>false</warnOnReflection>
++ <copyDeclaredNamespaceOnly>true</copyDeclaredNamespaceOnly>
++ <copiedNamespaces>
++ <copiedNamespace>none</copiedNamespace>
++ </copiedNamespaces>
++ </configuration>
++ </plugin>
++ <plugin>
++ <artifactId>maven-surefire-report-plugin</artifactId>
++ <configuration>
++ <reportsDirectories>
++ <file>${project.build.directory}/test-reports</file>
++ </reportsDirectories>
++ </configuration>
++ </plugin>
++ <plugin>
++ <artifactId>maven-shade-plugin</artifactId>
++ <version>2.2</version>
++ <executions>
++ <execution>
++ <phase>package</phase>
++ <goals>
++ <goal>shade</goal>
++ </goals>
++ </execution>
++ </executions>
++ <dependencies>
++ <dependency>
++ <groupId>org.apache.storm</groupId>
++ <artifactId>maven-shade-clojure-transformer</artifactId>
++ <version>${project.version}</version>
++ </dependency>
++ </dependencies>
++ <configuration>
++ <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
++ <promoteTransitiveDependencies>false</promoteTransitiveDependencies>
++ <createDependencyReducedPom>true</createDependencyReducedPom>
++ <minimizeJar>false</minimizeJar>
++ <artifactSet>
++ <includes>
++ <include>org.apache.thrift:*</include>
++ <include>org.apache.storm:*</include>
++ </includes>
++ </artifactSet>
++ <relocations>
++ <relocation>
++ <pattern>org.apache.thrift</pattern>
++ <shadedPattern>org.apache.thrift7</shadedPattern>
++ </relocation>
++ </relocations>
++ <transformers>
++ <transformer />
++ </transformers>
++ <filters>
++ <filter>
++ <artifact>org.apache.thrift:*</artifact>
++ <excludes>
++ <exclude>META-INF/LICENSE.txt</exclude>
++ <exclude>META-INF/NOTICE.txt</exclude>
++ </excludes>
++ </filter>
++ </filters>
++ </configuration>
++ </plugin>
++ </plugins>
++ </build>
++ <dependencies>
++ <dependency>
++ <groupId>org.clojure</groupId>
++ <artifactId>clojure</artifactId>
++ <version>1.5.1</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>clj-time</groupId>
++ <artifactId>clj-time</artifactId>
++ <version>0.4.1</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>compojure</groupId>
++ <artifactId>compojure</artifactId>
++ <version>1.1.3</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>hiccup</groupId>
++ <artifactId>hiccup</artifactId>
++ <version>0.3.6</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>ring</groupId>
++ <artifactId>ring-devel</artifactId>
++ <version>0.3.11</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>ring</groupId>
++ <artifactId>ring-jetty-adapter</artifactId>
++ <version>0.3.11</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.clojure</groupId>
++ <artifactId>tools.logging</artifactId>
++ <version>0.2.3</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.clojure</groupId>
++ <artifactId>math.numeric-tower</artifactId>
++ <version>0.0.1</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.clojure</groupId>
++ <artifactId>tools.cli</artifactId>
++ <version>0.2.4</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.clojure</groupId>
++ <artifactId>tools.nrepl</artifactId>
++ <version>0.2.3</version>
++ <scope>test</scope>
++ <exclusions>
++ <exclusion>
++ <artifactId>clojure</artifactId>
++ <groupId>org.clojure</groupId>
++ </exclusion>
++ </exclusions>
++ </dependency>
++ <dependency>
++ <groupId>clojure-complete</groupId>
++ <artifactId>clojure-complete</artifactId>
++ <version>0.2.3</version>
++ <scope>test</scope>
++ <exclusions>
++ <exclusion>
++ <artifactId>clojure</artifactId>
++ <groupId>org.clojure</groupId>
++ </exclusion>
++ </exclusions>
++ </dependency>
++ <dependency>
++ <groupId>commons-io</groupId>
++ <artifactId>commons-io</artifactId>
++ <version>2.4</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.commons</groupId>
++ <artifactId>commons-exec</artifactId>
++ <version>1.1</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>commons-lang</groupId>
++ <artifactId>commons-lang</artifactId>
++ <version>2.5</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.thrift</groupId>
++ <artifactId>libthrift</artifactId>
++ <version>0.7.0</version>
++ <scope>provided</scope>
++ <exclusions>
++ <exclusion>
++ <artifactId>slf4j-api</artifactId>
++ <groupId>org.slf4j</groupId>
++ </exclusion>
++ <exclusion>
++ <artifactId>servlet-api</artifactId>
++ <groupId>javax.servlet</groupId>
++ </exclusion>
++ </exclusions>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.curator</groupId>
++ <artifactId>curator-framework</artifactId>
++ <version>2.4.0</version>
++ <scope>compile</scope>
++ <exclusions>
++ <exclusion>
++ <artifactId>log4j</artifactId>
++ <groupId>log4j</groupId>
++ </exclusion>
++ <exclusion>
++ <artifactId>slf4j-log4j12</artifactId>
++ <groupId>org.slf4j</groupId>
++ </exclusion>
++ </exclusions>
++ </dependency>
++ <dependency>
++ <groupId>com.googlecode.json-simple</groupId>
++ <artifactId>json-simple</artifactId>
++ <version>1.1</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>com.twitter</groupId>
++ <artifactId>carbonite</artifactId>
++ <version>1.4.0</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.yaml</groupId>
++ <artifactId>snakeyaml</artifactId>
++ <version>1.11</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.httpcomponents</groupId>
++ <artifactId>httpclient</artifactId>
++ <version>4.3.3</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>com.googlecode.disruptor</groupId>
++ <artifactId>disruptor</artifactId>
++ <version>2.10.1</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.jgrapht</groupId>
++ <artifactId>jgrapht-core</artifactId>
++ <version>0.9.0</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>com.google.guava</groupId>
++ <artifactId>guava</artifactId>
++ <version>13.0</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>ch.qos.logback</groupId>
++ <artifactId>logback-classic</artifactId>
++ <version>1.0.6</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.slf4j</groupId>
++ <artifactId>log4j-over-slf4j</artifactId>
++ <version>1.6.6</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>io.netty</groupId>
++ <artifactId>netty</artifactId>
++ <version>3.6.3.Final</version>
++ <scope>compile</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.mockito</groupId>
++ <artifactId>mockito-all</artifactId>
++ <version>1.9.5</version>
++ <scope>test</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.clojars.runa</groupId>
++ <artifactId>conjure</artifactId>
++ <version>2.1.3</version>
++ <scope>test</scope>
++ </dependency>
++ <dependency>
++ <groupId>reply</groupId>
++ <artifactId>reply</artifactId>
++ <version>0.3.0</version>
++ <scope>provided</scope>
++ <exclusions>
++ <exclusion>
++ <artifactId>cd-client</artifactId>
++ <groupId>org.thnetos</groupId>
++ </exclusion>
++ <exclusion>
++ <artifactId>drawbridge</artifactId>
++ <groupId>com.cemerick</groupId>
++ </exclusion>
++ <exclusion>
++ <artifactId>versioneer</artifactId>
++ <groupId>trptcolin</groupId>
++ </exclusion>
++ <exclusion>
++ <artifactId>sjacket</artifactId>
++ <groupId>org.clojars.trptcolin</groupId>
++ </exclusion>
++ </exclusions>
++ </dependency>
++ </dependencies>
++</project>
++
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/storm-core/pom.xml
----------------------------------------------------------------------
diff --cc storm-core/pom.xml
index 440214d,134eeb8..91cd370
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@@ -20,7 -20,7 +20,11 @@@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
++<<<<<<< HEAD
+ <version>0.9.2-incubating-mt0000</version>
++=======
+ <version>0.9.3-incubating-SNAPSHOT</version>
++>>>>>>> upstream/master
</parent>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/storm-core/src/ui/public/topology.html
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/22a6ca9b/storm-dist/binary/pom.xml
----------------------------------------------------------------------
diff --cc storm-dist/binary/pom.xml
index bb1b876,0d97c0b..464ecd7
--- a/storm-dist/binary/pom.xml
+++ b/storm-dist/binary/pom.xml
@@@ -21,7 -21,7 +21,11 @@@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
++<<<<<<< HEAD
+ <version>0.9.2-incubating-mt0000</version>
++=======
+ <version>0.9.3-incubating-SNAPSHOT</version>
++>>>>>>> upstream/master
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.storm</groupId>
[16/24] git commit: Merge remote-tracking branch 'upstream/master'
Posted by bo...@apache.org.
Merge remote-tracking branch 'upstream/master'
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/62f854a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/62f854a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/62f854a8
Branch: refs/heads/master
Commit: 62f854a81bc643181bc38b1e3dc6d1164a94d874
Parents: a4b26af d448e34
Author: JuDasheng <ju...@meituan.com>
Authored: Fri Jun 13 15:30:09 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Fri Jun 13 15:30:09 2014 +0800
----------------------------------------------------------------------
CHANGELOG.md | 3 +
DEVELOPER.md | 8 +
storm-core/pom.xml | 7 +
.../src/clj/backtype/storm/LocalCluster.clj | 71 +-
storm-core/src/clj/backtype/storm/LocalDRPC.clj | 5 +-
storm-core/src/clj/backtype/storm/bootstrap.clj | 9 +-
storm-core/src/clj/backtype/storm/clojure.clj | 10 +-
storm-core/src/clj/backtype/storm/cluster.clj | 427 +++++------
storm-core/src/clj/backtype/storm/config.clj | 145 ++--
.../src/clj/backtype/storm/daemon/drpc.clj | 135 ++--
storm-core/src/clj/backtype/storm/disruptor.clj | 80 +--
storm-core/src/clj/backtype/storm/event.clj | 49 +-
storm-core/src/clj/backtype/storm/log.clj | 22 +-
.../clj/backtype/storm/process_simulator.clj | 27 +-
storm-core/src/clj/backtype/storm/stats.clj | 289 ++++----
storm-core/src/clj/backtype/storm/testing.clj | 523 +++++++-------
storm-core/src/clj/backtype/storm/testing4j.clj | 58 +-
storm-core/src/clj/backtype/storm/thrift.clj | 255 ++++---
storm-core/src/clj/backtype/storm/timer.clj | 106 +--
storm-core/src/clj/backtype/storm/tuple.clj | 7 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 559 ++++++++-------
storm-core/src/clj/backtype/storm/util.clj | 717 ++++++++++---------
storm-core/src/clj/backtype/storm/zookeeper.clj | 158 ++--
.../netty/NettyRenameThreadFactory.java | 2 +-
.../backtype/storm/task/TopologyContext.java | 11 +
.../backtype/storm/utils/DisruptorQueue.java | 70 +-
.../storm/utils/DisruptorQueueTest.java | 153 ++++
27 files changed, 2167 insertions(+), 1739 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/62f854a8/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
[19/24] git commit: change log message in Could not find metric by
name
Posted by bo...@apache.org.
change log message in Could not find metric by name
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/d854ecca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/d854ecca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/d854ecca
Branch: refs/heads/master
Commit: d854ecca6a259020aeb2a2acf0bba6f1089531aa
Parents: e33023c
Author: JuDasheng <ju...@meituan.com>
Authored: Thu Jun 19 10:43:18 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Thu Jun 19 10:43:18 2014 +0800
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/spout/ShellSpout.java | 2 +-
storm-core/src/jvm/backtype/storm/task/ShellBolt.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/d854ecca/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index 9f4b38e..3f80147 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -103,7 +103,7 @@ public class ShellSpout implements ISpout {
//get metric by name
IMetric iMetric = _context.getRegisteredMetricByName(name);
if (iMetric == null) {
- throw new RuntimeException("Not find metric by name["+name+"] ");
+ throw new RuntimeException("Could not find metric by name["+name+"] ");
}
if ( !(iMetric instanceof IShellMetric)) {
throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/d854ecca/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index b02a7fd..5ab327e 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -244,7 +244,7 @@ public class ShellBolt implements IBolt {
//get metric by name
IMetric iMetric = _context.getRegisteredMetricByName(name);
if (iMetric == null) {
- throw new RuntimeException("Not find metric by name["+name+"] ");
+ throw new RuntimeException("Could not find metric by name["+name+"] ");
}
if ( !(iMetric instanceof IShellMetric)) {
throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
[22/24] git commit: resolve the metrics_test hang problem with d2r's
patch
Posted by bo...@apache.org.
resolve the metrics_test hang problem with d2r's patch
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/573c42a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/573c42a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/573c42a6
Branch: refs/heads/master
Commit: 573c42a64885dac9a6a0d4c69a754500b607a8f1
Parents: b004e06
Author: JuDasheng <ju...@meituan.com>
Authored: Tue Jul 1 22:30:06 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Tue Jul 1 22:30:06 2014 +0800
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/testing.clj | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/573c42a6/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index 32f7f88..1a5f53e 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -230,7 +230,10 @@
(log-error t# "Error in cluster")
(throw t#))
(finally
- (kill-local-storm-cluster ~cluster-sym)))))
+ (let [keep-waiting?# (atom true)]
+ (future (while @keep-waiting?# (simulate-wait ~cluster-sym)))
+ (kill-local-storm-cluster ~cluster-sym)
+ (reset! keep-waiting?# false))))))
(defmacro with-simulated-time-local-cluster
[& args]
[11/24] git commit: change version and modify for deploy
Posted by bo...@apache.org.
change version and modify for deploy
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/b3191fc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/b3191fc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/b3191fc5
Branch: refs/heads/master
Commit: b3191fc51cbe4f7fa442b27dd8e113159bc7804f
Parents: fbaf727
Author: JuDasheng <ju...@meituan.com>
Authored: Wed Jun 11 18:06:27 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Wed Jun 11 18:06:27 2014 +0800
----------------------------------------------------------------------
build.sh | 9 +++++++++
examples/storm-starter/pom.xml | 2 +-
external/storm-kafka/pom.xml | 2 +-
pom.xml | 20 +++++++++++++-------
.../maven-shade-clojure-transformer/pom.xml | 4 ++--
storm-core/pom.xml | 2 +-
6 files changed, 27 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b3191fc5/build.sh
----------------------------------------------------------------------
diff --git a/build.sh b/build.sh
new file mode 100755
index 0000000..2563c33
--- /dev/null
+++ b/build.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+mvn clean install -DskipTests=true
+
+cd storm-dist/binary
+
+mvn package
+
+cd -
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b3191fc5/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 11146b6..bc4b4b3 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.2-incubating-SNAPSHOT</version>
+ <version>0.9.2-incubating-mt0000</version>
<relativePath>../../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b3191fc5/external/storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index 1210dcd..a8f718e 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -22,7 +22,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.2-incubating-SNAPSHOT</version>
+ <version>0.9.2-incubating-mt0000</version>
<relativePath>../../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b3191fc5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2c0cbc1..52fe369 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,16 +18,17 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
+ <!--
<parent>
<groupId>org.apache</groupId>
<artifactId>apache</artifactId>
<version>10</version>
</parent>
-
+ -->
<groupId>org.apache.storm</groupId>
<artifactId>storm</artifactId>
- <version>0.9.2-incubating-SNAPSHOT</version>
+ <version>0.9.2-incubating-mt0000</version>
<packaging>pom</packaging>
<name>Storm</name>
<description>Distributed and fault-tolerant realtime computation</description>
@@ -254,11 +255,16 @@
</profiles>
<distributionManagement>
- <site>
- <id>storm.maven.website</id>
- <name>Storm Website</name>
- <url>file:///tmp/site</url>
- </site>
+ <repository>
+ <id>nexus-releases</id>
+ <name>nexus-releases</name>
+ <url>http://nexus.sankuai.com:8081/nexus/content/repositories/releases</url>
+ </repository>
+ <snapshotRepository>
+ <id>nexus-snapshots</id>
+ <name>nexus-snapshots</name>
+ <url>http://nexus.sankuai.com:8081/nexus/content/repositories/snapshots</url>
+ </snapshotRepository>
</distributionManagement>
<dependencyManagement>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b3191fc5/storm-buildtools/maven-shade-clojure-transformer/pom.xml
----------------------------------------------------------------------
diff --git a/storm-buildtools/maven-shade-clojure-transformer/pom.xml b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
index 7dfa2a2..1c944d5 100644
--- a/storm-buildtools/maven-shade-clojure-transformer/pom.xml
+++ b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.2-incubating-SNAPSHOT</version>
+ <version>0.9.2-incubating-mt0000</version>
<relativePath>../../pom.xml</relativePath>
</parent>
@@ -36,4 +36,4 @@
<scope>provided</scope>
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b3191fc5/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index fec6218..440214d 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.2-incubating-SNAPSHOT</version>
+ <version>0.9.2-incubating-mt0000</version>
</parent>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>