You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:42 UTC
[04/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TaskSendTargets.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TaskSendTargets.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TaskSendTargets.java
index 8a55ec5..9b86458 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TaskSendTargets.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TaskSendTargets.java
@@ -35,8 +35,7 @@ import com.alibaba.jstorm.utils.JStormUtils;
/**
*
- * tuple sending object, which get which task should tuple be send to, and
- * update statics
+ * tuple sending object, which get which task should tuple be send to, and update statics
*
* @author yannian/Longda
*
@@ -58,8 +57,7 @@ public class TaskSendTargets {
private boolean isDebuging = false;
private String debugIdStr;
- public TaskSendTargets(Map<Object, Object> _storm_conf, String _component,
- Map<String, Map<String, MkGrouper>> _stream_component_grouper,
+ public TaskSendTargets(Map<Object, Object> _storm_conf, String _component, Map<String, Map<String, MkGrouper>> _stream_component_grouper,
TopologyContext _topology_context, TaskBaseMetric _task_stats) {
this.stormConf = _storm_conf;
this.componentId = _component;
@@ -67,17 +65,14 @@ public class TaskSendTargets {
this.topologyContext = _topology_context;
this.taskStats = _task_stats;
- isDebuging =
- JStormUtils.parseBoolean(stormConf.get(Config.TOPOLOGY_DEBUG),
- false);
+ isDebuging = JStormUtils.parseBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
taskId = topologyContext.getThisTaskId();
debugIdStr = " Emit from " + componentId + ":" + taskId + " ";
}
// direct send tuple to special task
- public java.util.List<Integer> get(Integer out_task_id, String stream,
- List<Object> tuple) {
+ public List<Integer> get(Integer out_task_id, String stream, List<Object> tuple) {
// in order to improve acker's speed, skip checking
// String target_component =
@@ -92,29 +87,26 @@ public class TaskSendTargets {
// }
if (isDebuging) {
- LOG.info(debugIdStr + stream + " to " + out_task_id + ":"
- + tuple.toString());
+ LOG.info(debugIdStr + stream + " to " + out_task_id + ":" + tuple.toString());
}
taskStats.send_tuple(stream, 1);
- java.util.List<Integer> out_tasks = new ArrayList<Integer>();
+ List<Integer> out_tasks = new ArrayList<Integer>();
out_tasks.add(out_task_id);
return out_tasks;
}
// send tuple according to grouping
- public java.util.List<Integer> get(String stream, List<Object> tuple) {
- java.util.List<Integer> out_tasks = new ArrayList<Integer>();
+ public List<Integer> get(String stream, List<Object> tuple) {
+ List<Integer> out_tasks = new ArrayList<Integer>();
// get grouper, then get which task should tuple be sent to.
- Map<String, MkGrouper> componentCrouping =
- streamComponentgrouper.get(stream);
+ Map<String, MkGrouper> componentCrouping = streamComponentgrouper.get(stream);
if (componentCrouping == null) {
// if the target component's parallelism is 0, don't need send to
// them
- LOG.debug("Failed to get Grouper of " + stream + " in "
- + debugIdStr);
+ LOG.debug("Failed to get Grouper of " + stream + " in " + debugIdStr);
return out_tasks;
}
@@ -123,8 +115,7 @@ public class TaskSendTargets {
MkGrouper g = ee.getValue();
if (GrouperType.direct.equals(g.gettype())) {
- throw new IllegalArgumentException(
- "Cannot do regular emit to direct stream");
+ throw new IllegalArgumentException("Cannot do regular emit to direct stream");
}
out_tasks.addAll(g.grouper(tuple));
@@ -133,8 +124,7 @@ public class TaskSendTargets {
if (isDebuging) {
- LOG.info(debugIdStr + stream + " to " + out_tasks + ":"
- + tuple.toString());
+ LOG.info(debugIdStr + stream + " to " + out_tasks + ":" + tuple.toString());
}
int num_out_tasks = out_tasks.size();
@@ -144,8 +134,7 @@ public class TaskSendTargets {
return out_tasks;
}
- public void updateStreamCompGrouper(
- Map<String, Map<String, MkGrouper>> streamComponentgrouper) {
+ public void updateStreamCompGrouper(Map<String, Map<String, MkGrouper>> streamComponentgrouper) {
this.streamComponentgrouper = streamComponentgrouper;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TupleInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TupleInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TupleInfo.java
index a6a3406..dc2a2bf 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TupleInfo.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TupleInfo.java
@@ -66,8 +66,7 @@ public class TupleInfo implements Serializable {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/UnanchoredSend.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/UnanchoredSend.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/UnanchoredSend.java
index 72c4061..6b0ed2d 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/UnanchoredSend.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/UnanchoredSend.java
@@ -32,11 +32,9 @@ import com.alibaba.jstorm.task.TaskTransfer;
*/
public class UnanchoredSend {
- public static void send(TopologyContext topologyContext,
- TaskSendTargets taskTargets, TaskTransfer transfer_fn,
- String stream, List<Object> values) {
+ public static void send(TopologyContext topologyContext, TaskSendTargets taskTargets, TaskTransfer transfer_fn, String stream, List<Object> values) {
- java.util.List<Integer> tasks = taskTargets.get(stream, values);
+ List<Integer> tasks = taskTargets.get(stream, values);
if (tasks.size() == 0) {
return;
}
@@ -44,8 +42,7 @@ public class UnanchoredSend {
Integer taskId = topologyContext.getThisTaskId();
for (Integer task : tasks) {
- TupleImplExt tup =
- new TupleImplExt(topologyContext, values, taskId, stream);
+ TupleImplExt tup = new TupleImplExt(topologyContext, values, taskId, stream);
tup.setTargetTaskId(task);
transfer_fn.transfer(tup);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/ITaskReportErr.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/ITaskReportErr.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/ITaskReportErr.java
index d0d70be..2475ede 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/ITaskReportErr.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/ITaskReportErr.java
@@ -25,4 +25,6 @@ package com.alibaba.jstorm.task.error;
*/
public interface ITaskReportErr {
public void report(Throwable error);
+
+ public void report(String error);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskErrorRunable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskErrorRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskErrorRunable.java
index d460e5a..1b41037 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskErrorRunable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskErrorRunable.java
@@ -20,8 +20,7 @@ package com.alibaba.jstorm.task.error;
import com.alibaba.jstorm.callback.RunnableCallback;
/**
- * The callback will be called, when task occur error It just call
- * TaskReportErrorAndDie
+ * The callback will be called, when task occur error It just call TaskReportErrorAndDie
*
* @author yannian
*
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportError.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportError.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportError.java
index e7506b2..bf177f6 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportError.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportError.java
@@ -34,8 +34,7 @@ public class TaskReportError implements ITaskReportErr {
private String topology_id;
private int task_id;
- public TaskReportError(StormClusterState _storm_cluster_state,
- String _topology_id, int _task_id) {
+ public TaskReportError(StormClusterState _storm_cluster_state, String _topology_id, int _task_id) {
this.zkCluster = _storm_cluster_state;
this.topology_id = _topology_id;
this.task_id = _task_id;
@@ -44,16 +43,23 @@ public class TaskReportError implements ITaskReportErr {
@Override
public void report(Throwable error) {
- LOG.error("Report error to /ZK/taskerrors/" + topology_id + "/"
- + task_id + "\n", error);
+ LOG.error("Report error to /ZK/taskerrors/" + topology_id + "/" + task_id + "\n", error);
try {
zkCluster.report_task_error(topology_id, task_id, error);
} catch (Exception e) {
// TODO Auto-generated catch block
- LOG.error("Failed update error to /ZK/taskerrors/" + topology_id
- + "/" + task_id + "\n", e);
+ LOG.error("Failed update error to /ZK/taskerrors/" + topology_id + "/" + task_id + "\n", e);
}
-
}
+ @Override
+ public void report(String error) {
+
+ LOG.error("Report error to /ZK/taskerrors/" + topology_id + "/" + task_id + ": " + error);
+ try {
+ zkCluster.report_task_error(topology_id, task_id, error, null);
+ } catch (Exception e) {
+ LOG.error("Failed update error to /ZK/taskerrors/" + topology_id + "/" + task_id + "\n", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportErrorAndDie.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportErrorAndDie.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportErrorAndDie.java
index 4f4eab3..e8596de 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportErrorAndDie.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportErrorAndDie.java
@@ -29,15 +29,20 @@ public class TaskReportErrorAndDie implements ITaskReportErr {
private ITaskReportErr reporterror;
private RunnableCallback haltfn;
- public TaskReportErrorAndDie(ITaskReportErr _reporterror,
- RunnableCallback _haltfn) {
+ public TaskReportErrorAndDie(ITaskReportErr _reporterror, RunnableCallback _haltfn) {
this.reporterror = _reporterror;
this.haltfn = _haltfn;
}
+ // If throwable error was caught, a error will be reported and current task will be shutdown.
@Override
public void report(Throwable error) {
this.reporterror.report(error);
this.haltfn.run();
}
+
+ @Override
+ public void report(String error) {
+ this.reporterror.report(error);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BaseExecutors.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BaseExecutors.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BaseExecutors.java
index 3f4c18f..7e4495a 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BaseExecutors.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BaseExecutors.java
@@ -17,43 +17,40 @@
*/
package com.alibaba.jstorm.task.execute;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
-import backtype.storm.serialization.KryoTupleDeserializer;
+import backtype.storm.Constants;
+import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
-
-import com.alibaba.jstorm.callback.AsyncLoopRunnable;
-import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.common.metric.Histogram;
+import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.QueueGauge;
import com.alibaba.jstorm.daemon.worker.timer.RotatingMapTrigger;
+import com.alibaba.jstorm.daemon.worker.timer.TaskBatchFlushTrigger;
import com.alibaba.jstorm.daemon.worker.timer.TaskHeartbeatTrigger;
-import com.alibaba.jstorm.metric.JStormHealthCheck;
-import com.alibaba.jstorm.metric.JStormMetrics;
-import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.*;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskBaseMetric;
+import com.alibaba.jstorm.task.TaskBatchTransfer;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
-import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
//import com.alibaba.jstorm.message.zeroMq.IRecvConnection;
@@ -67,15 +64,17 @@ import com.lmax.disruptor.dsl.ProducerType;
public class BaseExecutors extends RunnableCallback {
private static Logger LOG = LoggerFactory.getLogger(BaseExecutors.class);
- protected final String component_id;
+ protected final String topologyId;
+ protected final String componentId;
protected final int taskId;
protected final String idStr;
protected Map storm_conf;
-
+
protected final boolean isDebug;
protected TopologyContext userTopologyCtx;
+ protected TopologyContext sysTopologyCtx;
protected TaskBaseMetric task_stats;
protected volatile TaskStatus taskStatus;
@@ -93,74 +92,91 @@ public class BaseExecutors extends RunnableCallback {
protected Task task;
protected long assignmentTs;
protected TaskTransfer taskTransfer;
+
+ protected JStormMetricsReporter metricsReporter;
+
+ protected boolean isFinishInit = false;
+
+ protected RotatingMapTrigger rotatingMapTrigger;
+ protected TaskHeartbeatTrigger taskHbTrigger;
// protected IntervalCheck intervalCheck = new IntervalCheck();
- public BaseExecutors(Task task, TaskTransfer _transfer_fn, Map _storm_conf,
- Map<Integer, DisruptorQueue> innerTaskTransfer,
- TopologyContext topology_context, TopologyContext _user_context,
- TaskBaseMetric _task_stats, TaskStatus taskStatus,
- ITaskReportErr _report_error) {
+ public BaseExecutors(Task task) {
this.task = task;
- this.storm_conf = _storm_conf;
-
- this.userTopologyCtx = _user_context;
- this.task_stats = _task_stats;
- this.taskId = topology_context.getThisTaskId();
- this.innerTaskTransfer = innerTaskTransfer;
- this.component_id = topology_context.getThisComponentId();
- this.idStr = JStormServerUtils.getName(component_id, taskId);
-
- this.taskStatus = taskStatus;
- this.report_error = _report_error;
-
- this.isDebug =
- JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG),
- false);
-
- message_timeout_secs =
- JStormUtils.parseInt(
- storm_conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS),
- 30);
-
- int queue_size =
- Utils.getInt(storm_conf
- .get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256);
- WaitStrategy waitStrategy =
- (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(storm_conf);
- this.exeQueue =
- DisruptorQueue.mkInstance(idStr, ProducerType.MULTI,
- queue_size, waitStrategy);
+ this.storm_conf = task.getStormConf();
+
+ this.userTopologyCtx = task.getUserContext();
+ this.sysTopologyCtx = task.getTopologyContext();
+ this.task_stats = task.getTaskStats();
+ this.taskId = sysTopologyCtx.getThisTaskId();
+ this.innerTaskTransfer = task.getInnerTaskTransfer();
+ this.topologyId = sysTopologyCtx.getTopologyId();
+ this.componentId = sysTopologyCtx.getThisComponentId();
+ this.idStr = JStormServerUtils.getName(componentId, taskId);
+
+ this.taskStatus = task.getTaskStatus();
+ this.report_error = task.getReportErrorDie();
+ this.taskTransfer = task.getTaskTransfer();
+ this.metricsReporter = task.getWorkerData().getMetricsReporter();
+
+ this.isDebug = JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG), false);
+
+ message_timeout_secs = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30);
+
+ int queue_size = Utils.getInt(storm_conf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256);
+ WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(storm_conf);
+ this.exeQueue = DisruptorQueue.mkInstance(idStr, ProducerType.MULTI, queue_size, waitStrategy);
this.exeQueue.consumerStarted();
- this.controlQueue = new LinkedBlockingDeque<Object>(8);
+ this.controlQueue = new LinkedBlockingDeque<Object>();
this.registerInnerTransfer(exeQueue);
- QueueGauge exeQueueGauge =
- new QueueGauge(idStr + MetricDef.EXECUTE_QUEUE, exeQueue);
- JStormMetrics.registerTaskGauge(exeQueueGauge, taskId,
- MetricDef.EXECUTE_QUEUE);
- JStormHealthCheck.registerTaskHealthCheck(taskId,
- MetricDef.EXECUTE_QUEUE, exeQueueGauge);
+ QueueGauge exeQueueGauge = new QueueGauge(exeQueue, idStr, MetricDef.EXECUTE_QUEUE);
+ JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.EXECUTE_QUEUE, MetricType.GAUGE), new AsmGauge(
+ exeQueueGauge));
+ JStormHealthCheck.registerTaskHealthCheck(taskId, MetricDef.EXECUTE_QUEUE, exeQueueGauge);
- RotatingMapTrigger rotatingMapTrigger =
- new RotatingMapTrigger(storm_conf, idStr + "_rotating",
- exeQueue);
+ rotatingMapTrigger = new RotatingMapTrigger(storm_conf, idStr + "_rotating", exeQueue);
rotatingMapTrigger.register();
- TaskHeartbeatTrigger taskHbTrigger =
- new TaskHeartbeatTrigger(storm_conf, idStr + "_taskHeartbeat",
- exeQueue, controlQueue, taskId);
+ taskHbTrigger = new TaskHeartbeatTrigger(storm_conf, idStr + "_taskHeartbeat", exeQueue, controlQueue, taskId, componentId, sysTopologyCtx, report_error);
taskHbTrigger.register();
-
+
assignmentTs = System.currentTimeMillis();
- this.taskTransfer = _transfer_fn;
+ }
+
+ public void init() throws Exception {
+ // this function will be override by SpoutExecutor or BoltExecutor
+ throw new RuntimeException("Should implement this function");
+ }
+
+ public void initWrapper() {
+ try {
+ LOG.info("{} begin to init", idStr);
+
+ init();
+
+ if (taskId == getMinTaskIdOfWorker()) {
+ metricsReporter.setOutputCollector(getOutputCollector());
+ }
+
+ isFinishInit = true;
+ } catch (Throwable e) {
+ error = e;
+ LOG.error("Init error ", e);
+ report_error.report(e);
+ } finally {
+
+ LOG.info("{} initialization finished", idStr);
+
+ }
}
@Override
public void preRun() {
- WorkerClassLoader.switchThreadContext();
+ WorkerClassLoader.switchThreadContext();
}
@Override
@@ -174,28 +190,11 @@ public class BaseExecutors extends RunnableCallback {
throw new RuntimeException("Should implement this function");
}
- // @Override
- // public Object getResult() {
- // if (taskStatus.isRun()) {
- // return 0;
- // } else if (taskStatus.isPause()) {
- // return 0;
- // } else if (taskStatus.isShutdown()) {
- // this.shutdown();
- // return -1;
- // } else {
- // LOG.info("Unknow TaskStatus, shutdown executing thread of " + idStr);
- // this.shutdown();
- // return -1;
- // }
- // }
-
@Override
public Exception error() {
if (error == null) {
return null;
}
-
return new Exception(error);
}
@@ -213,12 +212,9 @@ public class BaseExecutors extends RunnableCallback {
LOG.info("Registor inner transfer for executor thread of " + idStr);
DisruptorQueue existInnerTransfer = innerTaskTransfer.get(taskId);
if (existInnerTransfer != null) {
- LOG.info("Exist inner task transfer for executing thread of "
- + idStr);
+ LOG.info("Exist inner task transfer for executing thread of " + idStr);
if (existInnerTransfer != disruptorQueue) {
- throw new RuntimeException(
- "Inner task transfer must be only one in executing thread of "
- + idStr);
+ throw new RuntimeException("Inner task transfer must be only one in executing thread of " + idStr);
}
}
innerTaskTransfer.put(taskId, disruptorQueue);
@@ -229,4 +225,12 @@ public class BaseExecutors extends RunnableCallback {
innerTaskTransfer.remove(taskId);
}
+ protected int getMinTaskIdOfWorker() {
+ SortedSet<Integer> tasks = new TreeSet<Integer>(sysTopologyCtx.getThisWorkerTasks());
+ return tasks.first();
+ }
+
+ public Object getOutputCollector() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltCollector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltCollector.java
index a51d09a..c4ee4ad 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltCollector.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltCollector.java
@@ -28,16 +28,13 @@ import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.Config;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.MessageId;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImplExt;
-
-import com.alibaba.jstorm.common.metric.Histogram;
+import com.alibaba.jstorm.common.metric.AsmHistogram;
+import com.alibaba.jstorm.common.metric.AsmMetric;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskBaseMetric;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.acker.Acker;
@@ -48,11 +45,18 @@ import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
import com.alibaba.jstorm.utils.TimeUtils;
+import backtype.storm.Config;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.MessageId;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleExt;
+import backtype.storm.tuple.TupleImplExt;
+
/**
* bolt output interface, do emit/ack/fail
*
* @author yannian/Longda
- *
*/
public class BoltCollector implements IOutputCollector {
private static Logger LOG = LoggerFactory.getLogger(BoltCollector.class);
@@ -72,62 +76,56 @@ public class BoltCollector implements IOutputCollector {
private Map storm_conf;
private Integer ackerNum;
- private Histogram timer;
+ private AsmMetric timer;
private Random random;
-
- public BoltCollector(int message_timeout_secs, ITaskReportErr report_error,
- TaskSendTargets _send_fn, Map _storm_conf,
- TaskTransfer _transfer_fn, TopologyContext _topology_context,
- Integer task_id, RotatingMap<Tuple, Long> tuple_start_times,
- TaskBaseMetric _task_stats) {
-
- this.rotateTime =
- 1000L * message_timeout_secs / (Acker.TIMEOUT_BUCKET_NUM - 1);
- this.reportError = report_error;
- this.sendTargets = _send_fn;
- this.storm_conf = _storm_conf;
- this.taskTransfer = _transfer_fn;
- this.topologyContext = _topology_context;
- this.task_id = task_id;
- this.task_stats = _task_stats;
-
- this.pending_acks =
- new RotatingMap<Tuple, Long>(Acker.TIMEOUT_BUCKET_NUM);
+
+
+ //ITaskReportErr report_error, TaskSendTargets _send_fn, Map _storm_conf, TaskTransfer _transfer_fn,
+ //TopologyContext _topology_context, Integer task_id, TaskBaseMetric _task_stats
+ public BoltCollector(Task task, RotatingMap<Tuple, Long> tuple_start_times, int message_timeout_secs) {
+
+ this.rotateTime = 1000L * message_timeout_secs / (Acker.TIMEOUT_BUCKET_NUM - 1);
+ this.reportError = task.getReportErrorDie();
+ this.sendTargets = task.getTaskSendTargets();
+ this.storm_conf = task.getStormConf();
+ this.taskTransfer = task.getTaskTransfer();
+ this.topologyContext = task.getTopologyContext();
+ this.task_id = task.getTaskId();
+ this.task_stats = task.getTaskStats();
+
+ this.pending_acks = new RotatingMap<Tuple, Long>(Acker.TIMEOUT_BUCKET_NUM);
// this.pending_acks = new TimeCacheMap<Tuple,
// Long>(message_timeout_secs,
// Acker.TIMEOUT_BUCKET_NUM);
this.tuple_start_times = tuple_start_times;
- this.ackerNum =
- JStormUtils.parseInt(storm_conf
- .get(Config.TOPOLOGY_ACKER_EXECUTORS));
+ this.ackerNum = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
String componentId = topologyContext.getThisComponentId();
- timer =
- JStormMetrics.registerTaskHistogram(task_id,
- MetricDef.COLLECTOR_EMIT_TIME);
+ this.timer =
+ JStormMetrics.registerTaskMetric(
+ MetricUtils.taskMetricName(topologyContext.getTopologyId(), componentId, task_id, MetricDef.COLLECTOR_EMIT_TIME, MetricType.HISTOGRAM),
+ new AsmHistogram());
random = new Random();
random.setSeed(System.currentTimeMillis());
+
}
@Override
- public List<Integer> emit(String streamId, Collection<Tuple> anchors,
- List<Object> tuple) {
+ public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
return boltEmit(streamId, anchors, tuple, null);
}
@Override
- public void emitDirect(int taskId, String streamId,
- Collection<Tuple> anchors, List<Object> tuple) {
+ public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
boltEmit(streamId, anchors, tuple, taskId);
}
- private List<Integer> boltEmit(String out_stream_id,
- Collection<Tuple> anchors, List<Object> values, Integer out_task_id) {
- long start = System.nanoTime();
+ private List<Integer> boltEmit(String out_stream_id, Collection<Tuple> anchors, List<Object> values, Integer out_task_id) {
+ final long start = System.nanoTime();
try {
- java.util.List<Integer> out_tasks = null;
+ List<Integer> out_tasks;
if (out_task_id != null) {
out_tasks = sendTargets.get(out_task_id, out_stream_id, values);
} else {
@@ -146,59 +144,49 @@ public class BoltCollector implements IOutputCollector {
lastRotate = now;
}
put_xor(pending_acks, a, edge_id);
- for (Long root_id : a.getMessageId().getAnchorsToIds()
- .keySet()) {
+ for (Long root_id : a.getMessageId().getAnchorsToIds().keySet()) {
put_xor(anchors_to_ids, root_id, edge_id);
}
}
}
MessageId msgid = MessageId.makeId(anchors_to_ids);
- TupleImplExt tupleExt =
- new TupleImplExt(topologyContext, values, task_id,
- out_stream_id, msgid);
+ TupleImplExt tupleExt = new TupleImplExt(topologyContext, values, task_id, out_stream_id, msgid);
tupleExt.setTargetTaskId(t);
taskTransfer.transfer(tupleExt);
-
}
return out_tasks;
} catch (Exception e) {
LOG.error("bolt emit", e);
} finally {
long end = System.nanoTime();
- timer.update((end - start)/1000000.0d);
+ timer.update((end - start) / TimeUtils.NS_PER_US);
}
return new ArrayList<Integer>();
}
@Override
public void ack(Tuple input) {
-
if (ackerNum > 0) {
-
- Long ack_val = Long.valueOf(0);
+ Long ack_val = 0L;
Object pend_val = pending_acks.remove(input);
if (pend_val != null) {
ack_val = (Long) (pend_val);
}
- for (Entry<Long, Long> e : input.getMessageId().getAnchorsToIds()
- .entrySet()) {
-
- UnanchoredSend.send(
- topologyContext,
- sendTargets,
- taskTransfer,
- Acker.ACKER_ACK_STREAM_ID,
- JStormUtils.mk_list((Object) e.getKey(),
- JStormUtils.bit_xor(e.getValue(), ack_val)));
+ for (Entry<Long, Long> e : input.getMessageId().getAnchorsToIds().entrySet()) {
+ UnanchoredSend.send(topologyContext, sendTargets, taskTransfer, Acker.ACKER_ACK_STREAM_ID,
+ JStormUtils.mk_list((Object) e.getKey(), JStormUtils.bit_xor(e.getValue(), ack_val)));
}
}
- Long delta = tuple_time_delta(tuple_start_times, input);
- if (delta != null) {
- task_stats.bolt_acked_tuple(input.getSourceComponent(),
- input.getSourceStreamId(), Double.valueOf(delta));
+ Long startTime = (Long) tuple_start_times.remove(input);
+ if (startTime != null) {
+ Long endTime = System.nanoTime();
+ long latency = (endTime - startTime)/TimeUtils.NS_PER_US;
+ long lifeCycle = (System.currentTimeMillis() - ((TupleExt) input).getCreationTimeStamp()) * TimeUtils.NS_PER_US;
+
+ task_stats.bolt_acked_tuple(input.getSourceComponent(), input.getSourceStreamId(), latency, lifeCycle);
}
}
@@ -207,17 +195,12 @@ public class BoltCollector implements IOutputCollector {
// if ackerNum == 0, we can just return
if (ackerNum > 0) {
pending_acks.remove(input);
- for (Entry<Long, Long> e : input.getMessageId().getAnchorsToIds()
- .entrySet()) {
- UnanchoredSend.send(topologyContext, sendTargets, taskTransfer,
- Acker.ACKER_FAIL_STREAM_ID,
- JStormUtils.mk_list((Object) e.getKey()));
+ for (Entry<Long, Long> e : input.getMessageId().getAnchorsToIds().entrySet()) {
+ UnanchoredSend.send(topologyContext, sendTargets, taskTransfer, Acker.ACKER_FAIL_STREAM_ID, JStormUtils.mk_list((Object) e.getKey()));
}
}
- task_stats.bolt_failed_tuple(input.getSourceComponent(),
- input.getSourceStreamId());
-
+ task_stats.bolt_failed_tuple(input.getSourceComponent(), input.getSourceStreamId());
}
@Override
@@ -226,21 +209,19 @@ public class BoltCollector implements IOutputCollector {
}
// Utility functions, just used here
- public static Long tuple_time_delta(RotatingMap<Tuple, Long> start_times,
- Tuple tuple) {
+ public static Long tuple_time_delta(RotatingMap<Tuple, Long> start_times, Tuple tuple) {
Long start_time = (Long) start_times.remove(tuple);
if (start_time != null) {
- return TimeUtils.time_delta_ms(start_time);
+ return (System.nanoTime() - start_time)/TimeUtils.NS_PER_US;
}
return null;
}
- public static void put_xor(RotatingMap<Tuple, Long> pending, Tuple key,
- Long id) {
+ public static void put_xor(RotatingMap<Tuple, Long> pending, Tuple key, Long id) {
// synchronized (pending) {
Long curr = pending.get(key);
if (curr == null) {
- curr = Long.valueOf(0);
+ curr = 0L;
}
pending.put(key, JStormUtils.bit_xor(curr, id));
// }
@@ -250,7 +231,7 @@ public class BoltCollector implements IOutputCollector {
// synchronized (pending) {
Long curr = pending.get(key);
if (curr == null) {
- curr = Long.valueOf(0);
+ curr = 0L;
}
pending.put(key, JStormUtils.bit_xor(curr, id));
// }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltExecutors.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltExecutors.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltExecutors.java
index 15adbf2..5c4413e 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltExecutors.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltExecutors.java
@@ -17,13 +17,6 @@
*/
package com.alibaba.jstorm.task.execute;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
import backtype.storm.Constants;
import backtype.storm.task.IBolt;
@@ -32,37 +25,48 @@ import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.BatchTuple;
import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleExt;
+import backtype.storm.tuple.TupleImplExt;
+import backtype.storm.tuple.Values;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.common.metric.Histogram;
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.common.metric.AsmHistogram;
+import com.alibaba.jstorm.common.metric.AsmMetric;
+import com.alibaba.jstorm.daemon.worker.timer.BackpressureCheckTrigger;
import com.alibaba.jstorm.daemon.worker.timer.TaskBatchFlushTrigger;
import com.alibaba.jstorm.daemon.worker.timer.TickTupleTrigger;
import com.alibaba.jstorm.daemon.worker.timer.TimerConstants;
import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger;
import com.alibaba.jstorm.metric.JStormMetrics;
+import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.metric.MetricDef;
-import com.alibaba.jstorm.task.Task;
-import com.alibaba.jstorm.task.TaskBaseMetric;
-import com.alibaba.jstorm.task.TaskStatus;
-import com.alibaba.jstorm.task.TaskTransfer;
-import com.alibaba.jstorm.task.TaskBatchTransfer;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.task.*;
import com.alibaba.jstorm.task.acker.Acker;
+import com.alibaba.jstorm.task.backpressure.BackpressureTrigger;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.error.ITaskReportErr;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatRunable;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
import com.alibaba.jstorm.utils.TimeUtils;
import com.lmax.disruptor.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
/**
- *
* BoltExecutor
*
* @author yannian/Longda
- *
*/
public class BoltExecutors extends BaseExecutors implements EventHandler {
private static Logger LOG = LoggerFactory.getLogger(BoltExecutors.class);
@@ -76,89 +80,56 @@ public class BoltExecutors extends BaseExecutors implements EventHandler {
// internal outputCollector is BoltCollector
private OutputCollector outputCollector;
- private Histogram boltExeTimer;
-
- public BoltExecutors(Task task, IBolt _bolt, TaskTransfer _transfer_fn,
- Map<Integer, DisruptorQueue> innerTaskTransfer, Map storm_conf,
- TaskSendTargets _send_fn, TaskStatus taskStatus,
- TopologyContext sysTopologyCxt, TopologyContext userTopologyCxt,
- TaskBaseMetric _task_stats, ITaskReportErr _report_error) {
+ private AsmMetric boltExeTimer;
+ private volatile double exeTime;
- super(task, _transfer_fn, storm_conf, innerTaskTransfer,
- sysTopologyCxt, userTopologyCxt, _task_stats, taskStatus,
- _report_error);
+ private BackpressureTrigger backpressureTrigger;
+ private boolean isSystemBolt;
- this.bolt = _bolt;
+ //, IBolt _bolt, TaskTransfer _transfer_fn, Map<Integer, DisruptorQueue> innerTaskTransfer, Map storm_conf,
+ //TaskSendTargets _send_fn, TaskStatus taskStatus, TopologyContext sysTopologyCxt, TopologyContext userTopologyCxt, TaskBaseMetric _task_stats,
+ //ITaskReportErr _report_error, JStormMetricsReporter metricReport
+ public BoltExecutors(Task task) {
- // create TimeCacheMap
+ super(task);
- this.tuple_start_times =
- new RotatingMap<Tuple, Long>(Acker.TIMEOUT_BUCKET_NUM);
+ this.bolt = (IBolt)task.getTaskObj();
- this.ackerNum =
- JStormUtils.parseInt(storm_conf
- .get(Config.TOPOLOGY_ACKER_EXECUTORS));
-
- // don't use TimeoutQueue for recv_tuple_queue,
- // then other place should check the queue size
- // TimeCacheQueue.DefaultExpiredCallback<Tuple> logExpireCb = new
- // TimeCacheQueue.DefaultExpiredCallback<Tuple>(
- // idStr);
- // this.recv_tuple_queue = new
- // TimeCacheQueue<Tuple>(message_timeout_secs,
- // TimeCacheQueue.DEFAULT_NUM_BUCKETS, logExpireCb);
+ // create TimeCacheMap
+ this.tuple_start_times = new RotatingMap<Tuple, Long>(Acker.TIMEOUT_BUCKET_NUM);
+ this.ackerNum = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
// create BoltCollector
- IOutputCollector output_collector =
- new BoltCollector(message_timeout_secs, _report_error,
- _send_fn, storm_conf, _transfer_fn, sysTopologyCxt,
- taskId, tuple_start_times, _task_stats);
-
+ IOutputCollector output_collector = new BoltCollector(task, tuple_start_times, message_timeout_secs);
outputCollector = new OutputCollector(output_collector);
+ taskHbTrigger.setBoltOutputCollector(outputCollector);
- boltExeTimer =
- JStormMetrics.registerTaskHistogram(taskId,
- MetricDef.EXECUTE_TIME);
+ String metricName = MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.EXECUTE_TIME, MetricType.HISTOGRAM);
+ this.boltExeTimer = JStormMetrics.registerTaskMetric(metricName, new AsmHistogram());
- Object tickFrequence =
- storm_conf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+ Object tickFrequence = storm_conf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
if (tickFrequence != null) {
Integer frequence = JStormUtils.parseInt(tickFrequence);
- TickTupleTrigger tickTupleTrigger =
- new TickTupleTrigger(sysTopologyCxt, frequence, idStr
- + Constants.SYSTEM_TICK_STREAM_ID, exeQueue);
+ TickTupleTrigger tickTupleTrigger = new TickTupleTrigger(sysTopologyCtx, frequence, idStr + Constants.SYSTEM_TICK_STREAM_ID, exeQueue);
tickTupleTrigger.register();
}
-
- if (ConfigExtension.isTaskBatchTuple(storm_conf)) {
- TaskBatchFlushTrigger batchFlushTrigger =
- new TaskBatchFlushTrigger(5, idStr
- + Constants.SYSTEM_COMPONENT_ID,
- (TaskBatchTransfer) _transfer_fn);
- batchFlushTrigger.register(TimeUnit.MILLISECONDS);
- }
-
- try {
- // do prepare
- WorkerClassLoader.switchThreadContext();
-
- // Method method = IBolt.class.getMethod("prepare", new Class[]
- // {Map.class, TopologyContext.class,
- // OutputCollector.class});
- // method.invoke(bolt, new Object[] {storm_conf, userTopologyCxt,
- // outputCollector});
- bolt.prepare(storm_conf, userTopologyCtx, outputCollector);
-
- } catch (Throwable e) {
- error = e;
- LOG.error("bolt prepare error ", e);
- report_error.report(e);
- } finally {
- WorkerClassLoader.restoreThreadContext();
+
+
+ isSystemBolt = Common.isSystemComponent(componentId);
+ if (isSystemBolt == false) {
+ backpressureTrigger = new BackpressureTrigger(task, this, storm_conf, outputCollector);
+ int backpressureCheckFrequence = ConfigExtension.getBackpressureCheckIntervl(storm_conf);
+ BackpressureCheckTrigger backpressureCheckTrigger =
+ new BackpressureCheckTrigger(30, backpressureCheckFrequence, idStr + " backpressure check trigger", backpressureTrigger);
+ backpressureCheckTrigger.register(TimeUnit.MILLISECONDS);
}
LOG.info("Successfully create BoltExecutors " + idStr);
-
+ }
+
+ @Override
+ public void init() {
+ bolt.prepare(storm_conf, userTopologyCtx, outputCollector);
}
@Override
@@ -168,10 +139,14 @@ public class BoltExecutors extends BaseExecutors implements EventHandler {
@Override
public void run() {
+ if (isFinishInit == false) {
+ initWrapper();
+ }
while (taskStatus.isShutdown() == false) {
try {
+ //if (backpressureTrigger != null)
+ // backpressureTrigger.checkAndTrigger();
exeQueue.consumeBatchWhenAvailable(this);
-
processControlEvent();
} catch (Throwable e) {
if (taskStatus.isShutdown() == false) {
@@ -182,20 +157,19 @@ public class BoltExecutors extends BaseExecutors implements EventHandler {
}
@Override
- public void onEvent(Object event, long sequence, boolean endOfBatch)
- throws Exception {
-
+ public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
if (event == null) {
return;
}
long start = System.nanoTime();
-
try {
if (event instanceof Tuple) {
+ processControlEvent();
processTupleEvent((Tuple) event);
} else if (event instanceof BatchTuple) {
for (Tuple tuple : ((BatchTuple) event).getTuples()) {
+ processControlEvent();
processTupleEvent((Tuple) tuple);
}
} else if (event instanceof TimerTrigger.TimerEvent) {
@@ -205,18 +179,21 @@ public class BoltExecutors extends BaseExecutors implements EventHandler {
}
} finally {
long end = System.nanoTime();
- boltExeTimer.update((end - start) / 1000000.0d);
+ exeTime = (end - start) / TimeUtils.NS_PER_US;
+ boltExeTimer.update(exeTime);
}
}
private void processTupleEvent(Tuple tuple) {
- task_stats.recv_tuple(tuple.getSourceComponent(),
- tuple.getSourceStreamId());
-
- tuple_start_times.put(tuple, System.currentTimeMillis());
+ task_stats.recv_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId());
+ tuple_start_times.put(tuple, System.nanoTime());
try {
- bolt.execute(tuple);
+ if (isSystemBolt == false && tuple.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) {
+ backpressureTrigger.handle(tuple);
+ } else {
+ bolt.execute(tuple);
+ }
} catch (Throwable e) {
error = e;
LOG.error("bolt execute error ", e);
@@ -226,11 +203,12 @@ public class BoltExecutors extends BaseExecutors implements EventHandler {
if (ackerNum == 0) {
// only when acker is disable
// get tuple process latency
- Long start_time = (Long) tuple_start_times.remove(tuple);
- if (start_time != null) {
- Long delta = TimeUtils.time_delta_ms(start_time);
- task_stats.bolt_acked_tuple(tuple.getSourceComponent(),
- tuple.getSourceStreamId(), Double.valueOf(delta));
+ Long startTime = (Long) tuple_start_times.remove(tuple);
+ if (startTime != null) {
+ Long endTime = System.nanoTime();
+ long latency = (endTime - startTime)/TimeUtils.NS_PER_US;
+ long lifeCycle = (System.currentTimeMillis() - ((TupleExt) tuple).getCreationTimeStamp()) * TimeUtils.NS_PER_US;
+ task_stats.bolt_acked_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), latency, lifeCycle);
}
}
}
@@ -244,8 +222,7 @@ public class BoltExecutors extends BaseExecutors implements EventHandler {
// only when acker is enable
for (Entry<Tuple, Long> entry : timeoutMap.entrySet()) {
Tuple input = entry.getKey();
- task_stats.bolt_failed_tuple(input.getSourceComponent(),
- input.getSourceStreamId());
+ task_stats.bolt_failed_tuple(input.getSourceComponent(), input.getSourceStreamId());
}
}
break;
@@ -262,13 +239,11 @@ public class BoltExecutors extends BaseExecutors implements EventHandler {
break;
}
case TimerConstants.TASK_HEARTBEAT: {
- Integer taskId = (Integer) event.getMsg();
- TaskHeartbeatRunable.updateTaskHbStats(taskId, task);
+ taskHbTrigger.setExeThreadHbTime(TimeUtils.current_time_secs());
break;
}
default: {
- LOG.warn("Receive unsupported timer event, opcode="
- + event.getOpCode());
+ LOG.warn("Receive unsupported timer event, opcode=" + event.getOpCode());
break;
}
}
@@ -282,9 +257,17 @@ public class BoltExecutors extends BaseExecutors implements EventHandler {
processTimerEvent((TimerTrigger.TimerEvent) event);
LOG.debug("Received one event from control queue");
} else {
- LOG.warn("Received unknown control event, "
- + event.getClass().getName());
+ LOG.warn("Received unknown control event, " + event.getClass().getName());
}
}
}
+
+ public double getExecuteTime() {
+ return exeTime;
+ }
+
+ @Override
+ public Object getOutputCollector() {
+ return outputCollector;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/AckSpoutMsg.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/AckSpoutMsg.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/AckSpoutMsg.java
index d554efc..d3bc53a 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/AckSpoutMsg.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/AckSpoutMsg.java
@@ -23,10 +23,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.spout.ISpout;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleExt;
import com.alibaba.jstorm.client.spout.IAckValueSpout;
import com.alibaba.jstorm.task.TaskBaseMetric;
import com.alibaba.jstorm.task.comm.TupleInfo;
+import com.alibaba.jstorm.utils.TimeUtils;
/**
* The action after spout receive one ack tuple
@@ -38,15 +41,15 @@ public class AckSpoutMsg implements IAckMsg {
private static Logger LOG = LoggerFactory.getLogger(AckSpoutMsg.class);
private ISpout spout;
+ private Tuple tuple;
+ private TupleInfo tupleInfo;
private Object msgId;
private String stream;
- private long timeStamp;
private List<Object> values;
private TaskBaseMetric task_stats;
private boolean isDebug = false;
- public AckSpoutMsg(ISpout _spout, TupleInfo tupleInfo,
- TaskBaseMetric _task_stats, boolean _isDebug) {
+ public AckSpoutMsg(ISpout _spout, Tuple tuple, TupleInfo tupleInfo, TaskBaseMetric _task_stats, boolean _isDebug) {
this.task_stats = _task_stats;
@@ -55,10 +58,11 @@ public class AckSpoutMsg implements IAckMsg {
this.msgId = tupleInfo.getMessageId();
this.stream = tupleInfo.getStream();
- if (tupleInfo.getTimestamp() != 0) {
- this.timeStamp = System.currentTimeMillis() - tupleInfo.getTimestamp();
- }
+
this.values = tupleInfo.getValues();
+
+ this.tuple = tuple;
+ this.tupleInfo = tupleInfo;
}
public void run() {
@@ -73,7 +77,15 @@ public class AckSpoutMsg implements IAckMsg {
spout.ack(msgId);
}
- task_stats.spout_acked_tuple(stream, timeStamp);
+ long latency = 0, lifeCycle = 0;
+ if (tupleInfo.getTimestamp() != 0) {
+ long endTime = System.nanoTime();
+ latency = (endTime - tupleInfo.getTimestamp())/TimeUtils.NS_PER_US;
+ if (tuple != null && tuple instanceof TupleExt) {
+ lifeCycle = (System.currentTimeMillis() - ((TupleExt) tuple).getCreationTimeStamp()) * TimeUtils.NS_PER_US;
+ }
+ }
+ task_stats.spout_acked_tuple(stream, latency, lifeCycle);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/FailSpoutMsg.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/FailSpoutMsg.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/FailSpoutMsg.java
index 7b5d37b..f570a74 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/FailSpoutMsg.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/FailSpoutMsg.java
@@ -40,8 +40,7 @@ public class FailSpoutMsg implements IAckMsg {
private TaskBaseMetric task_stats;
private boolean isDebug = false;
- public FailSpoutMsg(Object id, ISpout _spout, TupleInfo _tupleInfo,
- TaskBaseMetric _task_stats, boolean _isDebug) {
+ public FailSpoutMsg(Object id, ISpout _spout, TupleInfo _tupleInfo, TaskBaseMetric _task_stats, boolean _isDebug) {
this.id = id;
this.spout = _spout;
this.tupleInfo = _tupleInfo;
@@ -63,8 +62,7 @@ public class FailSpoutMsg implements IAckMsg {
task_stats.spout_failed_tuple(tupleInfo.getStream());
if (isDebug) {
- LOG.info("Failed message rootId: {}, messageId:{} : {}", id,
- msg_id, tupleInfo.getValues().toString());
+ LOG.info("Failed message rootId: {}, messageId:{} : {}", id, msg_id, tupleInfo.getValues().toString());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/MultipleThreadSpoutExecutors.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/MultipleThreadSpoutExecutors.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/MultipleThreadSpoutExecutors.java
index a74079f..8edd3cc 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/MultipleThreadSpoutExecutors.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/MultipleThreadSpoutExecutors.java
@@ -17,19 +17,13 @@
*/
package com.alibaba.jstorm.task.execute.spout;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.WorkerClassLoader;
-
import com.alibaba.jstorm.callback.AsyncLoopRunnable;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
+import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskBaseMetric;
import com.alibaba.jstorm.task.TaskStatus;
@@ -39,35 +33,36 @@ import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.comm.TupleInfo;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.utils.RotatingMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* spout executor
- *
+ * <p/>
* All spout actions will be done here
*
* @author yannian/Longda
- *
*/
public class MultipleThreadSpoutExecutors extends SpoutExecutors {
- private static Logger LOG = LoggerFactory
- .getLogger(MultipleThreadSpoutExecutors.class);
-
- public MultipleThreadSpoutExecutors(Task task,
- backtype.storm.spout.ISpout _spout, TaskTransfer _transfer_fn,
- Map<Integer, DisruptorQueue> innerTaskTransfer, Map _storm_conf,
- TaskSendTargets sendTargets, TaskStatus taskStatus,
- TopologyContext topology_context, TopologyContext _user_context,
- TaskBaseMetric _task_stats, ITaskReportErr _report_error) {
- super(task, _spout, _transfer_fn, innerTaskTransfer, _storm_conf,
- sendTargets, taskStatus, topology_context, _user_context,
- _task_stats, _report_error);
-
- ackerRunnableThread = new AsyncLoopThread(new AckerRunnable());
- pending =
- new RotatingMap<Long, TupleInfo>(Acker.TIMEOUT_BUCKET_NUM,
- null, false);
-
- super.prepare(sendTargets, _transfer_fn, topology_context);
+ private static Logger LOG = LoggerFactory.getLogger(MultipleThreadSpoutExecutors.class);
+
+ public MultipleThreadSpoutExecutors(Task task) {
+ super(task);
+
+ ackerRunnableThread = new AsyncLoopThread(new AckerRunnable(), false, Thread.NORM_PRIORITY, false);
+ }
+
+ public void mkPending() {
+ pending = new RotatingMap<Long, TupleInfo>(Acker.TIMEOUT_BUCKET_NUM, null, false);
+ }
+
+ @Override
+ public void init() throws Exception {
+ super.init();
+ ackerRunnableThread.start();
}
@Override
@@ -75,11 +70,14 @@ public class MultipleThreadSpoutExecutors extends SpoutExecutors {
return idStr + "-" + MultipleThreadSpoutExecutors.class.getSimpleName();
}
- @Override
- public void run() {
+ @Override
+ public void run() {
+ if (isFinishInit == false) {
+ initWrapper();
+ }
- super.nextTuple();
- }
+ super.nextTuple();
+ }
class AckerRunnable extends RunnableCallback {
@@ -117,7 +115,7 @@ public class MultipleThreadSpoutExecutors extends SpoutExecutors {
}
}
-
+
LOG.info("Successfully shutdown Spout's acker thread " + idStr);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SingleThreadSpoutExecutors.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SingleThreadSpoutExecutors.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SingleThreadSpoutExecutors.java
index 9e4dd21..144b041 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SingleThreadSpoutExecutors.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SingleThreadSpoutExecutors.java
@@ -17,15 +17,9 @@
*/
package com.alibaba.jstorm.task.execute.spout;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.DisruptorQueue;
-import backtype.storm.utils.WorkerClassLoader;
-
+import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskBaseMetric;
import com.alibaba.jstorm.task.TaskStatus;
@@ -35,35 +29,30 @@ import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.comm.TupleInfo;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.utils.RotatingMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
/**
* spout executor
- *
+ * <p/>
* All spout actions will be done here
*
* @author yannian/Longda
- *
*/
public class SingleThreadSpoutExecutors extends SpoutExecutors {
- private static Logger LOG = LoggerFactory
- .getLogger(SingleThreadSpoutExecutors.class);
+ private static Logger LOG = LoggerFactory.getLogger(SingleThreadSpoutExecutors.class);
- public SingleThreadSpoutExecutors(Task task,
- backtype.storm.spout.ISpout _spout, TaskTransfer _transfer_fn,
- Map<Integer, DisruptorQueue> innerTaskTransfer, Map _storm_conf,
- TaskSendTargets sendTargets, TaskStatus taskStatus,
- TopologyContext topology_context, TopologyContext _user_context,
- TaskBaseMetric _task_stats, ITaskReportErr _report_error) {
- super(task, _spout, _transfer_fn, innerTaskTransfer, _storm_conf,
- sendTargets, taskStatus, topology_context, _user_context,
- _task_stats, _report_error);
+ public SingleThreadSpoutExecutors(Task task) {
+ super(task);
- // sending Tuple's TimeCacheMap
- pending =
- new RotatingMap<Long, TupleInfo>(Acker.TIMEOUT_BUCKET_NUM,
- null, true);
-
- super.prepare(sendTargets, _transfer_fn, topology_context);
+ }
+
+ @Override
+ public void mkPending() {
+ // sending Tuple's TimeCacheMap
+ pending = new RotatingMap<Long, TupleInfo>(Acker.TIMEOUT_BUCKET_NUM, null, true);
}
@Override
@@ -73,10 +62,14 @@ public class SingleThreadSpoutExecutors extends SpoutExecutors {
@Override
public void run() {
+ if (isFinishInit == false ) {
+ initWrapper();
+ }
+
executeEvent();
super.nextTuple();
-
+
processControlEvent();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutCollector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutCollector.java
index d913a9e..baf709f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutCollector.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutCollector.java
@@ -25,16 +25,12 @@ import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.Config;
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.MessageId;
-import backtype.storm.tuple.TupleImplExt;
-import backtype.storm.utils.DisruptorQueue;
-
-import com.alibaba.jstorm.common.metric.Histogram;
+import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskBaseMetric;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.acker.Acker;
@@ -44,12 +40,20 @@ import com.alibaba.jstorm.task.comm.UnanchoredSend;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeOutMap;
+import com.alibaba.jstorm.utils.TimeUtils;
+
+import backtype.storm.Config;
+import backtype.storm.spout.ISpout;
+import backtype.storm.spout.ISpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.MessageId;
+import backtype.storm.tuple.TupleImplExt;
+import backtype.storm.utils.DisruptorQueue;
/**
* spout collector, sending tuple through this Object
*
* @author yannian/Longda
- *
*/
public class SpoutCollector implements ISpoutOutputCollector {
private static Logger LOG = LoggerFactory.getLogger(SpoutCollector.class);
@@ -71,64 +75,53 @@ public class SpoutCollector implements ISpoutOutputCollector {
private Integer ackerNum;
private boolean isDebug = false;
- private Histogram emitTotalTimer;
+ private AsmHistogram emitTotalTimer;
Random random;
- public SpoutCollector(Integer task_id, backtype.storm.spout.ISpout spout,
- TaskBaseMetric task_stats, TaskSendTargets sendTargets,
- Map _storm_conf, TaskTransfer _transfer_fn,
- TimeOutMap<Long, TupleInfo> pending,
- TopologyContext topology_context,
- DisruptorQueue disruptorAckerQueue, ITaskReportErr _report_error) {
- this.sendTargets = sendTargets;
- this.storm_conf = _storm_conf;
- this.transfer_fn = _transfer_fn;
+ //Integer task_id, backtype.storm.spout.ISpout spout, TaskBaseMetric task_stats, TaskSendTargets sendTargets, Map _storm_conf,
+ //TaskTransfer _transfer_fn, TimeOutMap<Long, TupleInfo> pending, TopologyContext topology_context, DisruptorQueue disruptorAckerQueue,
+ //ITaskReportErr _report_error
+ public SpoutCollector(Task task, TimeOutMap<Long, TupleInfo> pending, DisruptorQueue disruptorAckerQueue) {
+ this.sendTargets = task.getTaskSendTargets();
+ this.storm_conf = task.getStormConf();
+ this.transfer_fn = task.getTaskTransfer();
this.pending = pending;
- this.topology_context = topology_context;
+ this.topology_context = task.getTopologyContext();
this.disruptorAckerQueue = disruptorAckerQueue;
- this.task_stats = task_stats;
- this.spout = spout;
- this.task_id = task_id;
- this.report_error = _report_error;
+ this.task_stats = task.getTaskStats();
+ this.spout = (ISpout)task.getTaskObj();
+ this.task_id = task.getTaskId();
+ this.report_error = task.getReportErrorDie();
- ackerNum =
- JStormUtils.parseInt(storm_conf
- .get(Config.TOPOLOGY_ACKER_EXECUTORS));
- isDebug =
- JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG),
- false);
+ ackerNum = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
+ isDebug = JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG), false);
random = new Random();
random.setSeed(System.currentTimeMillis());
String componentId = topology_context.getThisComponentId();
emitTotalTimer =
- JStormMetrics.registerTaskHistogram(task_id,
- MetricDef.COLLECTOR_EMIT_TIME);
-
+ (AsmHistogram) JStormMetrics
+ .registerTaskMetric(MetricUtils.taskMetricName(topology_context.getTopologyId(), componentId, task_id, MetricDef.COLLECTOR_EMIT_TIME,
+ MetricType.HISTOGRAM), new AsmHistogram());
}
@Override
- public List<Integer> emit(String streamId, List<Object> tuple,
- Object messageId) {
+ public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
return sendSpoutMsg(streamId, tuple, messageId, null);
}
@Override
- public void emitDirect(int taskId, String streamId, List<Object> tuple,
- Object messageId) {
+ public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
sendSpoutMsg(streamId, tuple, messageId, taskId);
}
- private List<Integer> sendSpoutMsg(String out_stream_id,
- List<Object> values, Object message_id, Integer out_task_id) {
-
- long startTime = System.nanoTime();
-
+ private List<Integer> sendSpoutMsg(String out_stream_id, List<Object> values, Object message_id, Integer out_task_id) {
+ final long startTime = System.nanoTime();
try {
- java.util.List<Integer> out_tasks = null;
+ List<Integer> out_tasks;
if (out_task_id != null) {
out_tasks = sendTargets.get(out_task_id, out_stream_id, values);
} else {
@@ -148,7 +141,7 @@ public class SpoutCollector implements ISpoutOutputCollector {
// when duplicate root_id, it will miss call ack/fail
Long root_id = MessageId.generateId(random);
if (needAck) {
- while (pending.containsKey(root_id) == true) {
+ while (pending.containsKey(root_id)) {
root_id = MessageId.generateId(random);
}
}
@@ -163,30 +156,23 @@ public class SpoutCollector implements ISpoutOutputCollector {
msgid = MessageId.makeUnanchored();
}
- TupleImplExt tp =
- new TupleImplExt(topology_context, values, task_id,
- out_stream_id, msgid);
+ TupleImplExt tp = new TupleImplExt(topology_context, values, task_id, out_stream_id, msgid);
tp.setTargetTaskId(t);
transfer_fn.transfer(tp);
-
}
if (needAck) {
-
TupleInfo info = new TupleInfo();
info.setStream(out_stream_id);
info.setValues(values);
info.setMessageId(message_id);
- info.setTimestamp(System.currentTimeMillis());
+ info.setTimestamp(System.nanoTime());
pending.putHead(root_id, info);
- List<Object> ackerTuple =
- JStormUtils.mk_list((Object) root_id,
- JStormUtils.bit_xor_vals(ackSeq), task_id);
+ List<Object> ackerTuple = JStormUtils.mk_list((Object) root_id, JStormUtils.bit_xor_vals(ackSeq), task_id);
- UnanchoredSend.send(topology_context, sendTargets, transfer_fn,
- Acker.ACKER_INIT_STREAM_ID, ackerTuple);
+ UnanchoredSend.send(topology_context, sendTargets, transfer_fn, Acker.ACKER_INIT_STREAM_ID, ackerTuple);
} else if (message_id != null) {
TupleInfo info = new TupleInfo();
@@ -195,23 +181,20 @@ public class SpoutCollector implements ISpoutOutputCollector {
info.setMessageId(message_id);
info.setTimestamp(0);
- AckSpoutMsg ack =
- new AckSpoutMsg(spout, info, task_stats, isDebug);
+ AckSpoutMsg ack = new AckSpoutMsg(spout, null, info, task_stats, isDebug);
ack.run();
-
}
return out_tasks;
} finally {
long endTime = System.nanoTime();
- emitTotalTimer.update((endTime - startTime)/1000000.0d);
+ emitTotalTimer.update((endTime - startTime) / TimeUtils.NS_PER_US);
}
}
@Override
public void reportError(Throwable error) {
- // TODO Auto-generated method stub
report_error.report(error);
}