You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:43 UTC
[05/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchReceiver.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchReceiver.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchReceiver.java
index db2e990..7ef7144 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchReceiver.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchReceiver.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.task.TaskReceiver.DeserializeRunnable;
import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.BatchTuple;
@@ -32,27 +33,20 @@ import backtype.storm.tuple.Tuple;
import backtype.storm.utils.DisruptorQueue;
public class TaskBatchReceiver extends TaskReceiver {
- private static Logger LOG = LoggerFactory
- .getLogger(TaskBatchReceiver.class);
+ private static Logger LOG = LoggerFactory.getLogger(TaskBatchReceiver.class);
- public TaskBatchReceiver(Task task, int taskId, Map stormConf,
- TopologyContext topologyContext,
- Map<Integer, DisruptorQueue> innerTaskTransfer,
+ public TaskBatchReceiver(Task task, int taskId, Map stormConf, TopologyContext topologyContext, Map<Integer, DisruptorQueue> innerTaskTransfer,
TaskStatus taskStatus, String taskName) {
- super(task, taskId, stormConf, topologyContext, innerTaskTransfer,
- taskStatus, taskName);
+ super(task, taskId, stormConf, topologyContext, innerTaskTransfer, taskStatus, taskName);
}
@Override
protected void setDeserializeThread() {
- this.deserializeThread =
- new AsyncLoopThread(new DeserializeBatchRunnable(
- deserializeQueue, innerTaskTransfer.get(taskId)));
+ this.deserializeThread = new AsyncLoopThread(new DeserializeBatchRunnable(deserializeQueue, innerTaskTransfer.get(taskId)));
}
public class DeserializeBatchRunnable extends DeserializeRunnable {
- public DeserializeBatchRunnable(DisruptorQueue deserializeQueue,
- DisruptorQueue exeQueue) {
+ public DeserializeBatchRunnable(DisruptorQueue deserializeQueue, DisruptorQueue exeQueue) {
super(deserializeQueue, exeQueue);
}
@@ -83,14 +77,11 @@ public class TaskBatchReceiver extends TaskReceiver {
return tuple;
} catch (Throwable e) {
if (taskStatus.isShutdown() == false) {
- LOG.error(
- idStr + " recv thread error "
- + JStormUtils.toPrintableString(ser_msg)
- + "\n", e);
+ LOG.error(idStr + " recv thread error " + JStormUtils.toPrintableString(ser_msg) + "\n", e);
}
} finally {
long end = System.nanoTime();
- deserializeTimer.update((end - start)/1000000.0d);
+ deserializeTimer.update((end - start) / TimeUtils.NS_PER_US);
}
return null;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchTransfer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchTransfer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchTransfer.java
index e10fe96..07d7cbb 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchTransfer.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchTransfer.java
@@ -20,6 +20,7 @@ package com.alibaba.jstorm.task;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,13 +28,15 @@ import org.slf4j.LoggerFactory;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.daemon.worker.WorkerData;
+import com.alibaba.jstorm.daemon.worker.timer.TaskBatchCheckTrigger;
+import com.alibaba.jstorm.daemon.worker.timer.TaskBatchFlushTrigger;
+import com.alibaba.jstorm.utils.EventSampler;
+import com.alibaba.jstorm.utils.Pair;
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.TaskMessage;
import backtype.storm.serialization.KryoTupleSerializer;
import backtype.storm.tuple.BatchTuple;
+import backtype.storm.tuple.ITupleExt;
import backtype.storm.tuple.TupleExt;
-import backtype.storm.utils.DisruptorQueue;
/**
* Batch Tuples, then send out
@@ -43,62 +46,84 @@ import backtype.storm.utils.DisruptorQueue;
*/
public class TaskBatchTransfer extends TaskTransfer {
- private static Logger LOG = LoggerFactory
- .getLogger(TaskBatchTransfer.class);
-
+ private static Logger LOG = LoggerFactory.getLogger(TaskBatchTransfer.class);
+ protected static final double BATCH_SIZE_THRESHOLD = 2.0;
+ protected static final int BATCH_FLUSH_INTERVAL_MS = 5;
+ protected static final int BATCH_CHECK_INTERVAL_S = 3600;
+ protected static final int BATCH_EVENT_SAMPLER_INTERVAL_S = 4 * 240;
+
private Map<Integer, BatchTuple> batchMap;
+ private final int maxBatchSize;
private int batchSize;
private Object lock = new Object();
+ private EventSampler eventSampler = null;
- public TaskBatchTransfer(Task task, String taskName,
- KryoTupleSerializer serializer, TaskStatus taskStatus,
- WorkerData workerData) {
+ public TaskBatchTransfer(Task task, String taskName, KryoTupleSerializer serializer, TaskStatus taskStatus, WorkerData workerData) {
super(task, taskName, serializer, taskStatus, workerData);
batchMap = new HashMap<Integer, BatchTuple>();
- batchSize =
- ConfigExtension.getTaskMsgBatchSize(workerData.getStormConf());
+ maxBatchSize = ConfigExtension.getTaskMsgBatchSize(workerData.getStormConf());
+
+
+ TaskBatchFlushTrigger batchFlushTrigger = new TaskBatchFlushTrigger(BATCH_FLUSH_INTERVAL_MS, taskName, this);
+ batchFlushTrigger.register(TimeUnit.MILLISECONDS);
+
+ TaskBatchCheckTrigger batchCheckTrigger = new TaskBatchCheckTrigger(BATCH_CHECK_INTERVAL_S, taskName, this);
+ batchCheckTrigger.register();
+
+ startCheck();
}
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ LOG.info(taskName + " set batch size as " + batchSize);
+ }
@Override
protected AsyncLoopThread setupSerializeThread() {
return new AsyncLoopThread(new TransferBatchRunnable());
}
+
+ public void startCheck() {
+ eventSampler = new EventSampler(BATCH_EVENT_SAMPLER_INTERVAL_S);
+ setBatchSize(maxBatchSize);
+ LOG.info("Start check batch size, task of " + taskName);
+ }
+
+ public void stopCheck() {
+ eventSampler = null;
+ LOG.info("Stop check batch size, task of " + taskName);
+ }
- @Override
- public void transfer(TupleExt tuple) {
- int targetTaskid = tuple.getTargetTaskId();
- synchronized (lock) {
- BatchTuple batch = getBatchTuple(targetTaskid);
+ @Override
+ public void push(int taskId, TupleExt tuple) {
+ synchronized (lock) {
+ BatchTuple batch = getBatchTuple(taskId);
- batch.addToBatch(tuple);
- if (batch.isBatchFull()) {
- pushToQueue(targetTaskid, batch);
- }
- }
- }
+ batch.addToBatch(tuple);
+ if (batch.isBatchFull()) {
+ serializeQueue.publish(batch);
+ batchMap.put(taskId, new BatchTuple(taskId, batchSize));
+ }
+ }
+
+ }
public void flush() {
+ Map<Integer, BatchTuple> oldBatchMap = null;
synchronized (lock) {
- for (Entry<Integer, BatchTuple> entry : batchMap.entrySet()) {
- int taskId = entry.getKey();
- BatchTuple batch = entry.getValue();
- if (batch != null && batch.currBatchSize() > 0) {
- pushToQueue(taskId, batch);
- }
+ oldBatchMap = batchMap;
+ batchMap = new HashMap<Integer, BatchTuple>();
+ }
+
+ for (Entry<Integer, BatchTuple> entry : oldBatchMap.entrySet()) {
+ BatchTuple batch = entry.getValue();
+ if (batch != null && batch.currBatchSize() > 0) {
+ serializeQueue.publish(batch);
}
}
}
- private void pushToQueue(int targetTaskid, BatchTuple batch) {
- DisruptorQueue exeQueue = innerTaskTransfer.get(targetTaskid);
- if (exeQueue != null) {
- exeQueue.publish(batch);
- } else {
- serializeQueue.publish(batch);
- }
- resetBatchTuple(targetTaskid);
- }
private BatchTuple getBatchTuple(int targetTaskId) {
BatchTuple ret = batchMap.get(targetTaskId);
@@ -109,33 +134,27 @@ public class TaskBatchTransfer extends TaskTransfer {
return ret;
}
- private void resetBatchTuple(int targetTaskId) {
- batchMap.put(targetTaskId, null);
- }
protected class TransferBatchRunnable extends TransferRunnable {
- @Override
- public void onEvent(Object event, long sequence, boolean endOfBatch)
- throws Exception {
-
- if (event == null) {
- return;
- }
-
- long start = System.currentTimeMillis();
- try {
- BatchTuple tuple = (BatchTuple) event;
- int taskid = tuple.getTargetTaskId();
- byte[] tupleMessage = serializer.serializeBatch(tuple);
- TaskMessage taskMessage = new TaskMessage(taskid, tupleMessage);
- IConnection conn = getConnection(taskid);
- if (conn != null)
- conn.send(taskMessage);
- } finally {
- long end = System.currentTimeMillis();
- timer.update(end - start);
- }
+ public byte[] serialize(ITupleExt tuple) {
+ BatchTuple batchTuple = (BatchTuple)tuple;
+ if (eventSampler != null) {
+ Pair<Integer, Double> result = eventSampler.avgCheck(batchTuple.currBatchSize());
+ if (result != null) {
+ Double avgBatchSize = result.getSecond();
+ LOG.info(taskName + " batch average size is " + avgBatchSize);
+ if (avgBatchSize < BATCH_SIZE_THRESHOLD) {
+ LOG.info("Due to average size is small, so directly reset batch size as 1");
+ // set the batch size as 1
+ // transfer can directly send tuple, don't need wait flush interval
+ setBatchSize(1);
+ }
+ stopCheck();
+ }
+
+ }
+ return serializer.serializeBatch(batchTuple);
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskInfo.java
index 0eb1c4b..ddfe63d 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskInfo.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskInfo.java
@@ -54,11 +54,8 @@ public class TaskInfo implements Serializable {
@Override
public boolean equals(Object assignment) {
- if (assignment instanceof TaskInfo
- && ((TaskInfo) assignment).getComponentId().equals(
- getComponentId())
- && ((TaskInfo) assignment).getComponentType().equals(
- componentType)) {
+ if (assignment instanceof TaskInfo && ((TaskInfo) assignment).getComponentId().equals(getComponentId())
+ && ((TaskInfo) assignment).getComponentType().equals(componentType)) {
return true;
}
return false;
@@ -66,13 +63,11 @@ public class TaskInfo implements Serializable {
@Override
public int hashCode() {
- return this.getComponentId().hashCode()
- + this.getComponentType().hashCode();
+ return this.getComponentId().hashCode() + this.getComponentType().hashCode();
}
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskReceiver.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskReceiver.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskReceiver.java
index ad32ceb..230ba16 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskReceiver.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskReceiver.java
@@ -17,32 +17,32 @@
*/
package com.alibaba.jstorm.task;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
import backtype.storm.serialization.KryoTupleDeserializer;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.DisruptorQueue;
-import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.common.metric.Histogram;
+import com.alibaba.jstorm.common.metric.AsmGauge;
+import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.QueueGauge;
-import com.alibaba.jstorm.metric.JStormHealthCheck;
-import com.alibaba.jstorm.metric.JStormMetrics;
-import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.*;
import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
+import com.esotericsoftware.kryo.KryoException;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
public class TaskReceiver {
private static Logger LOG = LoggerFactory.getLogger(TaskReceiver.class);
@@ -58,13 +58,11 @@ public class TaskReceiver {
protected DisruptorQueue deserializeQueue;
protected KryoTupleDeserializer deserializer;
protected AsyncLoopThread deserializeThread;
- protected Histogram deserializeTimer;
+ protected AsmHistogram deserializeTimer;
protected TaskStatus taskStatus;
- public TaskReceiver(Task task, int taskId, Map stormConf,
- TopologyContext topologyContext,
- Map<Integer, DisruptorQueue> innerTaskTransfer,
+ public TaskReceiver(Task task, int taskId, Map stormConf, TopologyContext topologyContext, Map<Integer, DisruptorQueue> innerTaskTransfer,
TaskStatus taskStatus, String taskName) {
this.task = task;
this.taskId = taskId;
@@ -77,34 +75,24 @@ public class TaskReceiver {
this.isDebugRecv = ConfigExtension.isTopologyDebugRecvTuple(stormConf);
- int queueSize =
- JStormUtils
- .parseInt(
- stormConf
- .get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE),
- 256);
-
- WaitStrategy waitStrategy =
- (WaitStrategy) JStormUtils
- .createDisruptorWaitStrategy(stormConf);
- this.deserializeQueue =
- DisruptorQueue.mkInstance("TaskDeserialize",
- ProducerType.MULTI, queueSize, waitStrategy);
+ int queueSize = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256);
+
+ WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(stormConf);
+ this.deserializeQueue = DisruptorQueue.mkInstance("TaskDeserialize", ProducerType.MULTI, queueSize, waitStrategy);
setDeserializeThread();
- this.deserializer =
- new KryoTupleDeserializer(stormConf, topologyContext);
+ this.deserializer = new KryoTupleDeserializer(stormConf, topologyContext);
+
+ String topologyId = topologyContext.getTopologyId();
+ String component = topologyContext.getThisComponentId();
deserializeTimer =
- JStormMetrics.registerTaskHistogram(taskId,
- MetricDef.DESERIALIZE_TIME);
-
- QueueGauge deserializeQueueGauge =
- new QueueGauge(idStr + MetricDef.DESERIALIZE_QUEUE,
- deserializeQueue);
- JStormMetrics.registerTaskGauge(deserializeQueueGauge, taskId,
- MetricDef.DESERIALIZE_QUEUE);
- JStormHealthCheck.registerTaskHealthCheck(taskId,
- MetricDef.DESERIALIZE_QUEUE, deserializeQueueGauge);
+ (AsmHistogram) JStormMetrics.registerTaskMetric(
+ MetricUtils.taskMetricName(topologyId, component, taskId, MetricDef.DESERIALIZE_TIME, MetricType.HISTOGRAM), new AsmHistogram());
+
+ QueueGauge deserializeQueueGauge = new QueueGauge(deserializeQueue, idStr, MetricDef.DESERIALIZE_QUEUE);
+ JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topologyId, component, taskId, MetricDef.DESERIALIZE_QUEUE, MetricType.GAUGE),
+ new AsmGauge(deserializeQueueGauge));
+ JStormHealthCheck.registerTaskHealthCheck(taskId, MetricDef.DESERIALIZE_QUEUE, deserializeQueueGauge);
}
public AsyncLoopThread getDeserializeThread() {
@@ -112,9 +100,7 @@ public class TaskReceiver {
}
protected void setDeserializeThread() {
- this.deserializeThread =
- new AsyncLoopThread(new DeserializeRunnable(deserializeQueue,
- innerTaskTransfer.get(taskId)));
+ this.deserializeThread = new AsyncLoopThread(new DeserializeRunnable(deserializeQueue, innerTaskTransfer.get(taskId)));
}
public DisruptorQueue getDeserializeQueue() {
@@ -126,8 +112,7 @@ public class TaskReceiver {
DisruptorQueue deserializeQueue;
DisruptorQueue exeQueue;
- DeserializeRunnable(DisruptorQueue deserializeQueue,
- DisruptorQueue exeQueue) {
+ DeserializeRunnable(DisruptorQueue deserializeQueue, DisruptorQueue exeQueue) {
this.deserializeQueue = deserializeQueue;
this.exeQueue = exeQueue;
}
@@ -162,24 +147,22 @@ public class TaskReceiver {
}
return tuple;
+ } catch (KryoException e) {
+ throw new RuntimeException(e);
} catch (Throwable e) {
if (taskStatus.isShutdown() == false) {
- LOG.error(
- idStr + " recv thread error "
- + JStormUtils.toPrintableString(ser_msg)
- + "\n", e);
+ LOG.error(idStr + " recv thread error " + JStormUtils.toPrintableString(ser_msg) + "\n", e);
}
} finally {
long end = System.nanoTime();
- deserializeTimer.update((end - start)/1000000.0d);
+ deserializeTimer.update((end - start) / TimeUtils.NS_PER_US);
}
return null;
}
@Override
- public void onEvent(Object event, long sequence, boolean endOfBatch)
- throws Exception {
+ public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
Object tuple = deserialize((byte[]) event);
if (tuple != null) {
@@ -189,7 +172,7 @@ public class TaskReceiver {
@Override
public void preRun() {
- WorkerClassLoader.switchThreadContext();
+ WorkerClassLoader.switchThreadContext();
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskShutdownDameon.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskShutdownDameon.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskShutdownDameon.java
index c49e9fc..d685c04 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskShutdownDameon.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskShutdownDameon.java
@@ -17,36 +17,31 @@
*/
package com.alibaba.jstorm.task;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.spout.ISpout;
import backtype.storm.task.IBolt;
-import backtype.storm.topology.IConfig;
+import backtype.storm.topology.IDynamicComponent;
import backtype.storm.utils.WorkerClassLoader;
-
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.daemon.worker.ShutdownableDameon;
-import com.alibaba.jstorm.metric.JStormMetrics;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatRunable;
import com.alibaba.jstorm.utils.JStormUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
/**
* shutdown one task
*
* @author yannian/Longda
- *
*/
public class TaskShutdownDameon implements ShutdownableDameon {
- private static Logger LOG = LoggerFactory
- .getLogger(TaskShutdownDameon.class);
+ private static Logger LOG = LoggerFactory.getLogger(TaskShutdownDameon.class);
public static final byte QUIT_MSG = (byte) 0xff;
+ private Task task;
private TaskStatus taskStatus;
private String topology_id;
private Integer task_id;
@@ -55,16 +50,15 @@ public class TaskShutdownDameon implements ShutdownableDameon {
private Object task_obj;
private boolean isClosed = false;
- public TaskShutdownDameon(TaskStatus taskStatus, String topology_id,
- Integer task_id, List<AsyncLoopThread> all_threads,
- StormClusterState zkCluster, Object task_obj) {
+ public TaskShutdownDameon(TaskStatus taskStatus, String topology_id, Integer task_id, List<AsyncLoopThread> all_threads, StormClusterState zkCluster,
+ Object task_obj, Task task) {
this.taskStatus = taskStatus;
this.topology_id = topology_id;
this.task_id = task_id;
this.all_threads = all_threads;
this.zkCluster = zkCluster;
this.task_obj = task_obj;
-
+ this.task = task;
}
@Override
@@ -104,18 +98,9 @@ public class TaskShutdownDameon implements ShutdownableDameon {
closeComponent(task_obj);
try {
- JStormMetrics.unregisterTask(task_id);
- TaskHeartbeatRunable.unregisterTaskStats(task_id);
- zkCluster.remove_task_heartbeat(topology_id, task_id);
+ zkCluster.disconnect();
} catch (Exception e) {
- // TODO Auto-generated catch block
- LOG.info("Failed to cleanup");
- } finally {
- try {
- zkCluster.disconnect();
- } catch (Exception e) {
- LOG.info("Failed to disconnect", e);
- }
+ LOG.error("Failed to disconnect zk for task-" + task_id);
}
LOG.info("Successfully shutdown task " + topology_id + ":" + task_id);
@@ -170,19 +155,22 @@ public class TaskShutdownDameon implements ShutdownableDameon {
}
}
- public void updateConf(Map conf) {
- if (task_obj instanceof IConfig) {
- ((IConfig) task_obj).updateConf(conf);
+ public void update(Map conf) {
+ if (task_obj instanceof IDynamicComponent) {
+ ((IDynamicComponent) task_obj).update(conf);
}
}
@Override
public void run() {
- // TODO Auto-generated method stub
shutdown();
}
public int getTaskId() {
return this.task_id;
}
+
+ public Task getTask() {
+ return this.task;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskTransfer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskTransfer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskTransfer.java
index efe6dee..4da4330 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskTransfer.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskTransfer.java
@@ -24,40 +24,44 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.Config;
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.scheduler.WorkerSlot;
-import backtype.storm.serialization.KryoTupleSerializer;
-import backtype.storm.tuple.TupleExt;
-import backtype.storm.utils.DisruptorQueue;
-import backtype.storm.utils.Utils;
-import backtype.storm.utils.WorkerClassLoader;
-
import com.alibaba.jstorm.callback.AsyncLoopRunnable;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
-import com.alibaba.jstorm.common.metric.MetricRegistry;
+import com.alibaba.jstorm.common.metric.AsmGauge;
+import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.QueueGauge;
-import com.alibaba.jstorm.common.metric.Timer;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.metric.JStormHealthCheck;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.task.backpressure.BackpressureController;
import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.serialization.KryoTupleSerializer;
+import backtype.storm.tuple.ITupleExt;
+import backtype.storm.tuple.TupleExt;
+import backtype.storm.utils.DisruptorQueue;
+import backtype.storm.utils.Utils;
+import backtype.storm.utils.WorkerClassLoader;
+
/**
* Sending entrance
- *
+ * <p/>
* Task sending all tuples through this Object
- *
+ * <p/>
* Serialize the Tuple and put the serialized data to the sending queue
*
* @author yannian
- *
*/
public class TaskTransfer {
@@ -71,15 +75,18 @@ public class TaskTransfer {
protected final AsyncLoopThread serializeThread;
protected volatile TaskStatus taskStatus;
protected String taskName;
- protected Timer timer;
+ protected AsmHistogram serializeTimer;
protected Task task;
-
+ protected String topolgyId;
+ protected String componentId;
+ protected int taskId;
+
protected ConcurrentHashMap<WorkerSlot, IConnection> nodeportSocket;
protected ConcurrentHashMap<Integer, WorkerSlot> taskNodeport;
- public TaskTransfer(Task task, String taskName,
- KryoTupleSerializer serializer, TaskStatus taskStatus,
- WorkerData workerData) {
+ protected BackpressureController backpressureController;
+
+ public TaskTransfer(Task task, String taskName, KryoTupleSerializer serializer, TaskStatus taskStatus, WorkerData workerData) {
this.task = task;
this.taskName = taskName;
this.serializer = serializer;
@@ -87,49 +94,53 @@ public class TaskTransfer {
this.storm_conf = workerData.getStormConf();
this.transferQueue = workerData.getTransferQueue();
this.innerTaskTransfer = workerData.getInnerTaskTransfer();
-
+
this.nodeportSocket = workerData.getNodeportSocket();
this.taskNodeport = workerData.getTaskNodeport();
- int queue_size =
- Utils.getInt(storm_conf
- .get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE));
- WaitStrategy waitStrategy =
- (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(storm_conf);
- this.serializeQueue =
- DisruptorQueue.mkInstance(taskName, ProducerType.MULTI,
- queue_size, waitStrategy);
+ this.topolgyId = workerData.getTopologyId();
+ this.componentId = this.task.getComponentId();
+ this.taskId = this.task.getTaskId();
+
+ int queue_size = Utils.getInt(storm_conf.get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE));
+ WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(storm_conf);
+ this.serializeQueue = DisruptorQueue.mkInstance(taskName, ProducerType.MULTI, queue_size, waitStrategy);
this.serializeQueue.consumerStarted();
String taskId = taskName.substring(taskName.indexOf(":") + 1);
- String metricName =
- MetricRegistry.name(MetricDef.SERIALIZE_QUEUE, taskName);
- QueueGauge serializeQueueGauge =
- new QueueGauge(metricName, serializeQueue);
- JStormMetrics.registerTaskGauge(serializeQueueGauge,
- Integer.valueOf(taskId), MetricDef.SERIALIZE_QUEUE);
- JStormHealthCheck.registerTaskHealthCheck(Integer.valueOf(taskId),
- MetricDef.SERIALIZE_QUEUE, serializeQueueGauge);
- timer =
- JStormMetrics.registerTaskTimer(Integer.valueOf(taskId),
- MetricDef.SERIALIZE_TIME);
+ QueueGauge serializeQueueGauge = new QueueGauge(serializeQueue, taskName, MetricDef.SERIALIZE_QUEUE);
+ JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topolgyId, componentId, this.taskId, MetricDef.SERIALIZE_QUEUE, MetricType.GAUGE),
+ new AsmGauge(serializeQueueGauge));
+ JStormHealthCheck.registerTaskHealthCheck(Integer.valueOf(taskId), MetricDef.SERIALIZE_QUEUE, serializeQueueGauge);
+ serializeTimer =
+ (AsmHistogram) JStormMetrics.registerTaskMetric(
+ MetricUtils.taskMetricName(topolgyId, componentId, this.taskId, MetricDef.SERIALIZE_TIME, MetricType.HISTOGRAM), new AsmHistogram());
serializeThread = setupSerializeThread();
+
+ backpressureController = new BackpressureController(storm_conf, task.getTaskId(), serializeQueue, queue_size);
LOG.info("Successfully start TaskTransfer thread");
}
public void transfer(TupleExt tuple) {
- int taskid = tuple.getTargetTaskId();
+ int taskId = tuple.getTargetTaskId();
- DisruptorQueue exeQueue = innerTaskTransfer.get(taskid);
+ DisruptorQueue exeQueue = innerTaskTransfer.get(taskId);
if (exeQueue != null) {
exeQueue.publish(tuple);
} else {
- serializeQueue.publish(tuple);
+ push(taskId, tuple);
}
+ if (backpressureController.isBackpressureMode()) {
+ backpressureController.flowControl();
+ }
+ }
+
+ public void push(int taskId, TupleExt tuple) {
+ serializeQueue.publish(tuple);
}
protected AsyncLoopThread setupSerializeThread() {
@@ -140,6 +151,10 @@ public class TaskTransfer {
return serializeThread;
}
+ public BackpressureController getBackpressureController() {
+ return backpressureController;
+ }
+
protected class TransferRunnable extends RunnableCallback implements EventHandler {
private AtomicBoolean shutdown = AsyncLoopRunnable.getShutdown();
@@ -148,6 +163,7 @@ public class TaskTransfer {
public String getThreadName() {
return taskName + "-" + TransferRunnable.class.getSimpleName();
}
+
@Override
public void preRun() {
@@ -156,61 +172,80 @@ public class TaskTransfer {
@Override
public void run() {
-
while (shutdown.get() == false) {
serializeQueue.consumeBatchWhenAvailable(this);
-
}
-
}
@Override
public void postRun() {
WorkerClassLoader.restoreThreadContext();
}
+
+ public byte[] serialize(ITupleExt tuple) {
+ return serializer.serialize((TupleExt)tuple);
+ }
@Override
- public void onEvent(Object event, long sequence, boolean endOfBatch)
- throws Exception {
+ public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
if (event == null) {
return;
}
- long start = System.currentTimeMillis();
+ long start = System.nanoTime();
try {
- TupleExt tuple = (TupleExt) event;
+
+ ITupleExt tuple = (ITupleExt) event;
int taskid = tuple.getTargetTaskId();
- byte[] tupleMessage = serializer.serialize(tuple);
- TaskMessage taskMessage = new TaskMessage(taskid, tupleMessage);
IConnection conn = getConnection(taskid);
if (conn != null) {
+ byte[] tupleMessage = serialize(tuple);
+ TaskMessage taskMessage = new TaskMessage(taskid, tupleMessage);
conn.send(taskMessage);
}
} finally {
- long end = System.currentTimeMillis();
- timer.update(end - start);
+ long end = System.nanoTime();
+ serializeTimer.update((end - start)/TimeUtils.NS_PER_US);
}
}
-
+
protected IConnection getConnection(int taskId) {
IConnection conn = null;
WorkerSlot nodePort = taskNodeport.get(taskId);
if (nodePort == null) {
- String errormsg = "can`t not found IConnection to " + taskId;
- LOG.warn("Intra transfer warn", new Exception(errormsg));
+ String errormsg = "IConnection to " + taskId + " can't be found";
+ LOG.warn("Internal transfer warn, throw tuple,", new Exception(errormsg));
} else {
conn = nodeportSocket.get(nodePort);
if (conn == null) {
- String errormsg = "can`t not found nodePort " + nodePort;
- LOG.warn("Intra transfer warn", new Exception(errormsg));
+ String errormsg = "NodePort to" + nodePort + " can't be found";
+ LOG.warn("Internal transfer warn, throw tuple,", new Exception(errormsg));
}
}
return conn;
}
+ protected void pullTuples(Object event) {
+ TupleExt tuple = (TupleExt) event;
+ int taskid = tuple.getTargetTaskId();
+ IConnection conn = getConnection(taskid);
+ if (conn != null) {
+ while (conn.available() == false) {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+
+ }
+ }
+ byte[] tupleMessage = serializer.serialize(tuple);
+ TaskMessage taskMessage = new TaskMessage(taskid, tupleMessage);
+ conn.send(taskMessage);
+ }
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TkHbCacheTime.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TkHbCacheTime.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TkHbCacheTime.java
index 596fa35..f703f25 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TkHbCacheTime.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TkHbCacheTime.java
@@ -17,12 +17,12 @@
*/
package com.alibaba.jstorm.task;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat;
+import backtype.storm.generated.TaskHeartbeat;
+
import com.alibaba.jstorm.utils.TimeUtils;
/**
- * TkHbCacheTime is describle taskheartcache (Map<topologyId, Map<taskid,
- * Map<tkHbCacheTime, time>>>)
+ * TkHbCacheTime is describle taskheartcache (Map<topologyId, Map<taskid, Map<tkHbCacheTime, time>>>)
*/
public class TkHbCacheTime {
@@ -54,12 +54,13 @@ public class TkHbCacheTime {
this.taskAssignedTime = taskAssignedTime;
}
- public void update(TaskHeartbeat zkTaskHeartbeat) {
- int nowSecs = TimeUtils.current_time_secs();
- this.nimbusTime = nowSecs;
- this.taskReportedTime = zkTaskHeartbeat.getTimeSecs();
- this.taskAssignedTime =
- zkTaskHeartbeat.getTimeSecs() - zkTaskHeartbeat.getUptimeSecs();
+ public void update(TaskHeartbeat taskHeartbeat) {
+ if (taskHeartbeat != null) {
+ int nowSecs = TimeUtils.current_time_secs();
+ this.nimbusTime = nowSecs;
+ this.taskReportedTime = taskHeartbeat.get_time();
+ this.taskAssignedTime = taskHeartbeat.get_time() - taskHeartbeat.get_uptime();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/acker/Acker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/acker/Acker.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/acker/Acker.java
index 2be1592..4deb8ed 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/acker/Acker.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/acker/Acker.java
@@ -17,25 +17,21 @@
*/
package com.alibaba.jstorm.task.acker;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
import backtype.storm.task.IBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
-
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
/**
- *
* @author yannian/Longda
- *
*/
public class Acker implements IBolt {
@@ -57,27 +53,19 @@ public class Acker implements IBolt {
private long rotateTime;
@Override
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
// pending = new TimeCacheMap<Object, AckObject>(timeoutSec,
// TIMEOUT_BUCKET_NUM);
this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM);
- this.rotateTime =
- 1000L
- * JStormUtils.parseInt(stormConf
- .get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30)
- / (TIMEOUT_BUCKET_NUM - 1);
+ this.rotateTime = 1000L * JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30) / (TIMEOUT_BUCKET_NUM - 1);
}
@Override
public void execute(Tuple input) {
Object id = input.getValue(0);
-
AckObject curr = pending.get(id);
-
String stream_id = input.getSourceStreamId();
-
if (Acker.ACKER_INIT_STREAM_ID.equals(stream_id)) {
if (curr == null) {
curr = new AckObject();
@@ -95,17 +83,13 @@ public class Acker implements IBolt {
} else if (Acker.ACKER_ACK_STREAM_ID.equals(stream_id)) {
if (curr != null) {
curr.update_ack(input.getValue(1));
-
} else {
// two case
// one is timeout
// the other is bolt's ack first come
curr = new AckObject();
-
- curr.val = Long.valueOf(input.getLong(1));
-
+ curr.val = input.getLong(1);
pending.put(id, curr);
-
}
} else if (Acker.ACKER_FAIL_STREAM_ID.equals(stream_id)) {
if (curr == null) {
@@ -113,31 +97,23 @@ public class Acker implements IBolt {
// already timeout, should go fail
return;
}
-
curr.failed = true;
-
} else {
LOG.info("Unknow source stream");
return;
}
Integer task = curr.spout_task;
-
if (task != null) {
-
if (curr.val == 0) {
pending.remove(id);
List values = JStormUtils.mk_list(id);
-
collector.emitDirect(task, Acker.ACKER_ACK_STREAM_ID, values);
-
} else {
-
if (curr.failed) {
pending.remove(id);
List values = JStormUtils.mk_list(id);
- collector.emitDirect(task, Acker.ACKER_FAIL_STREAM_ID,
- values);
+ collector.emitDirect(task, Acker.ACKER_FAIL_STREAM_ID, values);
}
}
} else {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/Backpressure.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/Backpressure.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/Backpressure.java
new file mode 100644
index 0000000..528fc6b
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/Backpressure.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.backpressure;
+
+import java.util.Map;
+
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.utils.JStormUtils;
+
+public abstract class Backpressure {
+ private static final String BACKPRESSURE_DELAY_TIME = "topology.backpressure.delay.time";
+
+ protected volatile boolean isBackpressureEnable;
+
+ protected volatile double highWaterMark;
+ protected volatile double lowWaterMark;
+
+ protected volatile double triggerBpRatio;
+
+ protected volatile long sleepTime;
+
+ public Backpressure(Map stormConf) {
+ this.isBackpressureEnable = ConfigExtension.isBackpressureEnable(stormConf);
+ this.highWaterMark = ConfigExtension.getBackpressureWaterMarkHigh(stormConf);
+ this.lowWaterMark = ConfigExtension.getBackpressureWaterMarkLow(stormConf);
+ this.triggerBpRatio = ConfigExtension.getBackpressureCoordinatorRatio(stormConf);
+ }
+
+ protected void updateConfig(Map stormConf) {
+ if (stormConf == null) {
+ return;
+ }
+
+ if (stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_ENABLE)) {
+ this.isBackpressureEnable = ConfigExtension.isBackpressureEnable(stormConf);
+ }
+
+ if (stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_WATER_MARK_HIGH)) {
+ this.highWaterMark = ConfigExtension.getBackpressureWaterMarkHigh(stormConf);
+ }
+
+ if (stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_WATER_MARK_LOW)) {
+ this.lowWaterMark = ConfigExtension.getBackpressureWaterMarkLow(stormConf);
+ }
+
+ if (stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_COORDINATOR_RATIO)) {
+ this.triggerBpRatio = ConfigExtension.getBackpressureCoordinatorRatio(stormConf);
+ }
+
+ if (stormConf.containsKey(BACKPRESSURE_DELAY_TIME)) {
+ long time = JStormUtils.parseLong(stormConf, 0l);
+ if (time != 0l) {
+ this.sleepTime = time;
+ }
+ }
+ }
+
+ public boolean isBackpressureConfigChange(Map stormConf) {
+ if (stormConf == null) {
+ return false;
+ }
+
+ if (stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_ENABLE) ||
+ stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_WATER_MARK_HIGH) ||
+ stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_WATER_MARK_LOW) ||
+ stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_COORDINATOR_RATIO) ||
+ stormConf.containsKey(BACKPRESSURE_DELAY_TIME)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureController.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureController.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureController.java
new file mode 100644
index 0000000..82dd938
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureController.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.backpressure;
+
+import java.util.List;
+import java.util.Map;
+
+import com.alibaba.jstorm.utils.JStormUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.task.TaskTransfer;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.DisruptorQueue;
+
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent;
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType;
+
+/**
+ * Flow Control
+ *
+ * @author Basti Liu
+ */
+public class BackpressureController extends Backpressure {
+ private static Logger LOG = LoggerFactory.getLogger(BackpressureController.class);
+
+ private int taskId;
+ private DisruptorQueue queueControlled;
+ private int totalQueueSize;
+ private int queueSizeReduced;
+
+ private boolean isBackpressureMode = false;
+
+ private SpoutOutputCollector outputCollector;
+
+ private long maxBound, minBound;
+
+ public BackpressureController(Map conf, int taskId, DisruptorQueue queue, int queueSize) {
+ super(conf);
+ this.queueControlled = queue;
+ this.totalQueueSize = queueSize;
+ this.queueSizeReduced = queueSize;
+ this.taskId = taskId;
+ this.maxBound = 0l;
+ this.minBound = 0l;
+ }
+
+ public void setOutputCollector(SpoutOutputCollector outputCollector) {
+ this.outputCollector = outputCollector;
+ }
+
+ public void control(TopoMasterCtrlEvent ctrlEvent) {
+ if (isBackpressureEnable == false) {
+ return;
+ }
+
+ EventType eventType = ctrlEvent.getEventType();
+ LOG.debug("Received control event, " + eventType.toString());
+ if (eventType.equals(EventType.startBackpressure)) {
+ List<Object> value = ctrlEvent.getEventValue();
+ int flowCtrlTime = value.get(0) != null ? (Integer) value.get(0) : 0;
+ start(flowCtrlTime);
+ } else if (eventType.equals(EventType.stopBackpressure)) {
+ stop();
+ } else if (eventType.equals(EventType.updateBackpressureConfig)) {
+ List<Object> value = ctrlEvent.getEventValue();
+ if (value != null) {
+ Map stormConf = (Map) value.get(0);
+ updateConfig(stormConf);
+
+ if (isBackpressureEnable == false) {
+ LOG.info("Disable backpressure in controller.");
+ resetBackpressureInfo();
+ } else {
+ LOG.info("Enable backpressure in controller");
+ }
+ }
+ }
+ }
+
+ public void flowControl() {
+ if (isBackpressureEnable == false) {
+ return;
+ }
+
+ try {
+ Thread.sleep(sleepTime);
+ while (isQueueCapacityAvailable() == false) {
+ Thread.sleep(1);
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Sleep was interrupted!");
+ }
+ }
+
+ private void resetBackpressureInfo() {
+ sleepTime = 0l;
+ maxBound = 0l;
+ minBound = 0l;
+ queueSizeReduced = totalQueueSize;
+ isBackpressureMode = false;
+ }
+
+ private void start(int flowCtrlTime) {
+ if (flowCtrlTime > 0) {
+ if (maxBound < flowCtrlTime) {
+ sleepTime = flowCtrlTime;
+ } else if (maxBound == flowCtrlTime) {
+ if (sleepTime >= maxBound) {
+ sleepTime++;
+ } else {
+ sleepTime = JStormUtils.halfValueOfSum(flowCtrlTime, sleepTime, true);
+ }
+ } else {
+ if (maxBound <= sleepTime) {
+ sleepTime++;
+ } else {
+ if (sleepTime >= flowCtrlTime) {
+ sleepTime = JStormUtils.halfValueOfSum(maxBound, sleepTime, true);
+ } else {
+ sleepTime = JStormUtils.halfValueOfSum(flowCtrlTime, sleepTime, true);
+ }
+ }
+ }
+ } else {
+ sleepTime++;
+ }
+ if (sleepTime > maxBound) {
+ maxBound = sleepTime;
+ }
+
+ int size = totalQueueSize / 100;
+ queueSizeReduced = size > 10 ? size : 10;
+ isBackpressureMode = true;
+
+ LOG.info("Start backpressure at spout-{}, sleepTime={}, queueSizeReduced={}, flowCtrlTime={}", taskId, sleepTime, queueSizeReduced, flowCtrlTime);
+ }
+
+ private void stop() {
+ if (sleepTime == minBound) {
+ minBound = 0;
+ }
+ sleepTime = JStormUtils.halfValueOfSum(minBound, sleepTime, false);
+
+ if (sleepTime == 0) {
+ resetBackpressureInfo();
+
+ TopoMasterCtrlEvent stopBp = new TopoMasterCtrlEvent(EventType.stopBackpressure, null);
+ outputCollector.emit(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, new Values(stopBp));
+ } else {
+ minBound = sleepTime;
+ }
+
+ LOG.info("Stop backpressure at spout-{}, sleepTime={}, queueSizeReduced={}, flowCtrlTime={}", taskId, sleepTime, queueSizeReduced);
+ }
+
+ public boolean isBackpressureMode() {
+ return isBackpressureMode & isBackpressureEnable;
+ }
+
+ public boolean isQueueCapacityAvailable() {
+ return (queueControlled.population() < queueSizeReduced);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureCoordinator.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureCoordinator.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureCoordinator.java
new file mode 100644
index 0000000..d270078
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureCoordinator.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.backpressure;
+
+import backtype.storm.generated.*;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.cluster.*;
+import com.alibaba.jstorm.task.acker.Acker;
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent;
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+/**
+ * Coordinator is responsible for the request from trigger and controller.
+ * - Event from trigger:
+ * Find relative controllers (source spouts), and decide if it is required to send out the request.
+ * - Event from controller:
+ * If backpressure stop event, send stop request to all target triggers.
+ *
+ * @author Basti Li
+ */
+public class BackpressureCoordinator extends Backpressure {
+ private static final Logger LOG = LoggerFactory.getLogger(BackpressureCoordinator.class);
+
+ private static final int adjustedTime = 5;
+
+ private TopologyContext context;
+ private StormTopology topology;
+ private OutputCollector output;
+
+ private int topologyMasterId;
+ private Map<Integer, String> taskIdToComponentId;
+ private Map<String, SpoutSpec> spouts;
+ private Map<String, Bolt> bolts;
+
+ // Map<source componentId, Map<ComponentId, backpressure info>>
+ private Map<String, SourceBackpressureInfo> SourceTobackpressureInfo;
+
+ private Integer period;
+
+ private StormClusterState zkCluster;
+ private static final String BACKPRESSURE_TAG = "Backpressure has been ";
+
+
+ public BackpressureCoordinator(OutputCollector output, TopologyContext topologyContext, Integer taskId) {
+ super(topologyContext.getStormConf());
+ this.context = topologyContext;
+ this.topology = topologyContext.getRawTopology();
+ this.spouts = new HashMap<String, SpoutSpec>();
+ if (this.topology.get_spouts() != null) {
+ this.spouts.putAll(this.topology.get_spouts());
+ }
+ this.bolts = new HashMap<String, Bolt>();
+ if (this.topology.get_bolts() != null) {
+ this.bolts.putAll(this.topology.get_bolts());
+ }
+ this.taskIdToComponentId = topologyContext.getTaskToComponent();
+ this.topologyMasterId = taskId;
+
+ this.output = output;
+
+ int checkInterval = ConfigExtension.getBackpressureCheckIntervl(context.getStormConf());
+ int sampleNum = ConfigExtension.getBackpressureTriggerSampleNumber(context.getStormConf());
+ this.period = checkInterval * sampleNum;
+
+ this.zkCluster = topologyContext.getZkCluster();
+ try {
+ this.SourceTobackpressureInfo = zkCluster.get_backpressure_info(context.getTopologyId());
+ if (this.SourceTobackpressureInfo == null) {
+ this.SourceTobackpressureInfo = new HashMap<String, SourceBackpressureInfo>();
+ } else {
+ LOG.info("Successfully retrieve existing SourceTobackpressureInfo from zk: " + SourceTobackpressureInfo);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to get SourceTobackpressureInfo from zk", e);
+ this.SourceTobackpressureInfo = new HashMap<String, SourceBackpressureInfo>();
+ }
+ }
+
+ private Set<String> getInputSpoutsForBolt(StormTopology topology, String boltComponentId, Set<String> componentsTraversed) {
+ Set<String> ret = new TreeSet<String>();
+
+ if (componentsTraversed == null) {
+ componentsTraversed = new HashSet<String>();
+ }
+
+ Bolt bolt = bolts.get(boltComponentId);
+ if (bolt == null) {
+ return ret;
+ }
+
+ ComponentCommon common = bolt.get_common();
+ Set<GlobalStreamId> inputstreams = common.get_inputs().keySet();
+ Set<String> inputComponents = new TreeSet<String>();
+ for (GlobalStreamId streamId : inputstreams) {
+ inputComponents.add(streamId.get_componentId());
+ }
+
+ Set<String> spoutComponentIds = new HashSet<String>(spouts.keySet());
+ Set<String> boltComponentIds = new HashSet<String>(bolts.keySet());
+ for (String inputComponent : inputComponents) {
+ // Skip the components which has been traversed before, to avoid dead loop when there are loop bolts in topology
+ if (componentsTraversed.contains(inputComponent)) {
+ continue;
+ } else {
+ componentsTraversed.add(inputComponent);
+ }
+
+ if (spoutComponentIds.contains(inputComponent)) {
+ ret.add(inputComponent);
+ } else if (boltComponentIds.contains(inputComponent)) {
+ Set<String> inputs = getInputSpoutsForBolt(topology, inputComponent, componentsTraversed);
+ ret.addAll(inputs);
+ }
+ }
+
+ return ret;
+ }
+
+ public void process(Tuple input) {
+ if (isBackpressureEnable == false) {
+ return;
+ }
+
+ int sourceTask = input.getSourceTask();
+ String componentId = taskIdToComponentId.get(sourceTask);
+ if (componentId == null) {
+ LOG.warn("Receive tuple from unknown task-" + sourceTask);
+ return;
+ }
+
+ if (spouts.keySet().contains(componentId)) {
+ if (SourceTobackpressureInfo.get(componentId) != null) {
+ handleEventFromSpout(sourceTask, input);
+ }
+ } else if (bolts.keySet().contains(componentId)) {
+ handleEventFromBolt(sourceTask, input);
+ }
+ }
+
+ public void updateBackpressureConfig(Map conf) {
+ updateConfig(conf);
+
+ if (isBackpressureEnable == false) {
+ LOG.info("Disable backpressure in coordinator.");
+ SourceTobackpressureInfo.clear();
+ } else {
+ LOG.info("Enable backpressure in coordinator.");
+ }
+
+ TopoMasterCtrlEvent updateBpConfig = new TopoMasterCtrlEvent(EventType.updateBackpressureConfig, new ArrayList<Object>());
+ updateBpConfig.addEventValue(conf);
+ Values values = new Values(updateBpConfig);
+ Set<Integer> targetTasks = new TreeSet<Integer>(taskIdToComponentId.keySet());
+ targetTasks.remove(topologyMasterId);
+ targetTasks.removeAll(context.getComponentTasks(Acker.ACKER_COMPONENT_ID));
+ sendBackpressureMessage(targetTasks, values, EventType.updateBackpressureConfig);
+
+ reportBackpressureStatus();
+ }
+
+ private boolean checkSpoutsUnderBackpressure(Set<String> spouts) {
+ boolean ret = false;
+
+ if (spouts != null) {
+ for (String spout : spouts) {
+ SourceBackpressureInfo backpressureInfo = SourceTobackpressureInfo.get(spout);
+ if (backpressureInfo != null && backpressureInfo.getTasks().size() > 0) {
+ ret = true;
+ break;
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ private TargetBackpressureInfo getBackpressureInfoBySourceSpout(String sourceSpout, String targetComponentId, boolean created) {
+ TargetBackpressureInfo ret = null;
+
+ SourceBackpressureInfo info = SourceTobackpressureInfo.get(sourceSpout);
+ if (info == null) {
+ if (created) {
+ info = new SourceBackpressureInfo();
+ SourceTobackpressureInfo.put(sourceSpout, info);
+ }
+ } else {
+ ret = info.getTargetTasks().get(targetComponentId);
+ }
+
+ if (ret == null && created) {
+ ret = new TargetBackpressureInfo();
+ info.getTargetTasks().put(targetComponentId, ret);
+ }
+ return ret;
+ }
+
+ private boolean checkIntervalExpired(long time) {
+ boolean ret = false;
+ if (time != 0) {
+ if (System.currentTimeMillis() - time > period) {
+ ret = true;
+ }
+ }
+ return ret;
+ }
+
+ private void sendBackpressureMessage(Set<Integer> targetTasks, Values value, EventType backpressureType) {
+ for (Integer taskId : targetTasks) {
+ output.emitDirect(taskId, Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, value);
+ LOG.debug("Send " + backpressureType.toString() + " request to taskId-" + taskId);
+ }
+ }
+
+ private void handleEventFromSpout(int sourceTask, Tuple input) {
+ TopoMasterCtrlEvent ctrlEvent = (TopoMasterCtrlEvent) input.getValueByField("ctrlEvent");
+ EventType type = ctrlEvent.getEventType();
+
+ boolean update = false;
+ if (type.equals(EventType.stopBackpressure)) {
+ String spoutComponentId = taskIdToComponentId.get(sourceTask);
+ SourceBackpressureInfo info = SourceTobackpressureInfo.remove(spoutComponentId);
+ if (info != null) {
+ info.getTasks().remove(sourceTask);
+ if (info.getTasks().size() == 0) {
+ for (Entry<String, TargetBackpressureInfo> entry : info.getTargetTasks().entrySet()) {
+ String componentId = entry.getKey();
+
+ // Make sure if all source spouts for this bolt are NOT under backpressure mode.
+ Set<String> sourceSpouts = getInputSpoutsForBolt(topology, componentId, null);
+ if (checkSpoutsUnderBackpressure(sourceSpouts) == false) {
+ Set<Integer> tasks = new TreeSet<Integer>();
+ tasks.addAll(context.getComponentTasks(componentId));
+ sendBackpressureMessage(tasks, new Values(ctrlEvent), type);
+ }
+ }
+ }
+ update = true;
+ } else {
+ LOG.error("Received event from non-recorded spout-" + sourceTask);
+ }
+
+ } else {
+ LOG.warn("Received unexpected event, " + type.toString());
+ }
+
+ // If task set under backpressure has been changed, report the latest status
+ if (update) {
+ reportBackpressureStatus();
+ }
+ }
+
+ private void handleEventFromBolt(int sourceTask, Tuple input) {
+ String componentId = taskIdToComponentId.get(sourceTask);
+ Set<String> inputSpouts = getInputSpoutsForBolt(topology, componentId, null);
+
+ TopoMasterCtrlEvent ctrlEvent = (TopoMasterCtrlEvent) input.getValueByField("ctrlEvent");
+ EventType type = ctrlEvent.getEventType();
+ Set<Integer> notifyList = new TreeSet<Integer>();
+ Values values = null;
+ TargetBackpressureInfo info = null;
+ boolean update = false;
+ if (type.equals(EventType.startBackpressure)) {
+ int flowCtrlTime = (Integer) ctrlEvent.getEventValue().get(0);
+ for (String spout : inputSpouts) {
+ info = getBackpressureInfoBySourceSpout(spout, componentId, true);
+ SourceBackpressureInfo sourceInfo = SourceTobackpressureInfo.get(spout);
+ update = info.getTasks().add(sourceTask);
+ boolean add = false;
+ if (System.currentTimeMillis() - sourceInfo.getLastestTimeStamp() > period) {
+ add = true;
+ } else {
+ EventType lastestBpEvent = sourceInfo.getLastestBackpressureEvent();
+ if (lastestBpEvent != null && lastestBpEvent.equals(EventType.startBackpressure) == false) {
+ add = true;
+ }
+
+ int maxFlowCtrlTime = sourceInfo.getMaxFlowCtrlTime();
+ if ((flowCtrlTime - maxFlowCtrlTime > adjustedTime || maxFlowCtrlTime == -1) &&
+ flowCtrlTime >= 0) {
+ add = true;
+ }
+ }
+ info.setFlowCtrlTime(flowCtrlTime);
+ info.setBackpressureStatus(type);
+
+ if (add) {
+ info.setTimeStamp(System.currentTimeMillis());
+ // Only when the number of bolt tasks sending request is more than a configured number, coordinator will
+ // send out backpressure request to controller. It is used to avoid the problem that very few tasks might
+ // cause the over control.
+ double taskBpRatio = Double.valueOf(info.getTasks().size()) / Double.valueOf(context.getComponentTasks(componentId).size()) ;
+ if (taskBpRatio >= triggerBpRatio) {
+ Set<Integer> spoutTasks = new TreeSet<Integer>(context.getComponentTasks(spout));
+ if (spoutTasks != null) {
+ SourceTobackpressureInfo.get(spout).getTasks().addAll(spoutTasks);
+ notifyList.addAll(spoutTasks);
+ }
+ } else {
+ update = false;
+ }
+ } else {
+ update = false;
+ }
+ }
+
+ List<Object> value = new ArrayList<Object>();
+ value.add(info.getFlowCtrlTime());
+ TopoMasterCtrlEvent startBp = new TopoMasterCtrlEvent(EventType.startBackpressure, value);
+ values = new Values(startBp);
+ } else if (type.equals(EventType.stopBackpressure)) {
+ for (String spout : inputSpouts) {
+ info = getBackpressureInfoBySourceSpout(spout, componentId, false);
+ SourceBackpressureInfo sourceInfo = SourceTobackpressureInfo.get(spout);
+ if (info != null) {
+ Set<Integer> tasks = info.getTasks();
+ if (tasks != null) {
+ if(tasks.remove(sourceTask)) {
+ update = true;
+ }
+ }
+ }
+
+ if (sourceInfo != null && checkIntervalExpired(sourceInfo.getLastestTimeStamp())) {
+ info.setTimeStamp(System.currentTimeMillis());
+ Set<Integer> spoutTasks = new TreeSet<Integer>(context.getComponentTasks(spout));
+ if (spoutTasks != null) {
+ notifyList.addAll(spoutTasks);
+ }
+ info.setBackpressureStatus(type);
+ }
+ }
+
+ // Check if all source spouts are Not under backpressure. If so, notify the bolt.
+ if (checkSpoutsUnderBackpressure(inputSpouts) == false) {
+ notifyList.add(sourceTask);
+ }
+
+ TopoMasterCtrlEvent stoptBp = new TopoMasterCtrlEvent(EventType.stopBackpressure, null);
+ values = new Values(stoptBp);
+ } else {
+ LOG.warn("Received unknown event " + type.toString());
+ }
+
+ sendBackpressureMessage(notifyList, values, type);
+
+ // If task set under backpressure has been changed, report the latest status
+ if (update) {
+ LOG.info("inputspouts=" + inputSpouts + " for " + componentId + "-" + sourceTask + ", eventType=" + type.toString());
+ reportBackpressureStatus();
+ }
+ }
+
+ private Set<Integer> getTasksUnderBackpressure() {
+ Set<Integer> ret = new HashSet<Integer>();
+
+ for (Entry<String, SourceBackpressureInfo> entry : SourceTobackpressureInfo.entrySet()) {
+ SourceBackpressureInfo sourceInfo = entry.getValue();
+ if (sourceInfo.getTasks().size() > 0) {
+ ret.addAll(sourceInfo.getTasks());
+
+ for (Entry<String, TargetBackpressureInfo> targetEntry: sourceInfo.getTargetTasks().entrySet()) {
+ ret.addAll(targetEntry.getValue().getTasks());
+ }
+
+ }
+ }
+
+ return ret;
+ }
+
+ private void reportBackpressureStatus() {
+ try {
+ StringBuilder stringBuilder = new StringBuilder();
+ Set<Integer> underTasks = getTasksUnderBackpressure();
+ stringBuilder.append(BACKPRESSURE_TAG);
+ if (underTasks.isEmpty()){
+ stringBuilder.append("closed ");
+ }else {
+ stringBuilder.append("opened: ");
+ stringBuilder.append(underTasks);
+ }
+ zkCluster.report_task_error(context.getTopologyId(), context.getThisTaskId(), stringBuilder.toString(), BACKPRESSURE_TAG);
+ zkCluster.set_backpressure_info(context.getTopologyId(), SourceTobackpressureInfo);
+ LOG.info(stringBuilder.toString());
+ } catch (Exception e) {
+ LOG.error("can't update backpressure state ", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureTrigger.java
new file mode 100644
index 0000000..0f2df95
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureTrigger.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.backpressure;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.DisruptorQueue;
+
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.cluster.StormClusterState;
+import com.alibaba.jstorm.task.Task;
+import com.alibaba.jstorm.task.execute.BoltExecutors;
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent;
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType;
+import com.alibaba.jstorm.utils.IntervalCheck;
+
+/**
+ * Responsible for checking if back pressure shall be triggered.
+ * When heavy load (the size of the queue monitored reaches high water mark), start back pressure,
+ * and when load goes down, stop back pressure.
+ *
+ * @author Basti Liu
+ */
+public class BackpressureTrigger extends Backpressure {
+ private static final Logger LOG = LoggerFactory.getLogger(BackpressureTrigger.class);
+
+ private Task task;
+ private int taskId;
+
+ // Queue which is going to be monitored
+ private DisruptorQueue exeQueue;
+ private DisruptorQueue recvQueue;
+
+ private BoltExecutors boltExecutor;
+
+ private volatile boolean isUnderBackpressure = false;
+
+ private IntervalCheck intervalCheck;
+
+ OutputCollector output;
+
+ private List<EventType> samplingSet;
+ private double triggerSampleRate;
+
+ public BackpressureTrigger(Task task, BoltExecutors boltExecutor, Map stormConf, OutputCollector output) {
+ super(stormConf);
+
+ this.task = task;
+ this.taskId = task.getTaskId();
+
+ int sampleNum = ConfigExtension.getBackpressureTriggerSampleNumber(stormConf);
+ int smapleInterval = sampleNum * (ConfigExtension.getBackpressureCheckIntervl(stormConf));
+ this.intervalCheck = new IntervalCheck();
+ this.intervalCheck.setIntervalMs(smapleInterval);
+ this.intervalCheck.start();
+
+ this.samplingSet = new ArrayList<EventType>();
+ this.triggerSampleRate = ConfigExtension.getBackpressureTriggerSampleRate(stormConf);
+
+ this.output = output;
+
+ this.boltExecutor = boltExecutor;
+
+ try {
+ StormClusterState zkCluster = task.getZkCluster();
+ Map<String, SourceBackpressureInfo> backpressureInfo = zkCluster.get_backpressure_info(task.getTopologyId());
+ if (backpressureInfo != null) {
+ for (Entry<String, SourceBackpressureInfo> entry : backpressureInfo.entrySet()) {
+ SourceBackpressureInfo info = entry.getValue();
+ Map<String, TargetBackpressureInfo> targetInfoMap = info.getTargetTasks();
+ if (targetInfoMap != null) {
+ TargetBackpressureInfo targetInfo = targetInfoMap.get(task.getComponentId());
+ if (targetInfo != null && targetInfo.getTasks().contains(taskId)) {
+ isBackpressureEnable = true;
+ LOG.info("Retrieved backpressure info for task-" + taskId);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.info("Failed to get backpressure info from zk", e);
+ }
+ LOG.info("Finished BackpressureTrigger init, highWaterMark=" + highWaterMark + ", lowWaterMark=" + lowWaterMark + ", sendInterval="
+ + intervalCheck.getInterval());
+ }
+
+ public void checkAndTrigger() {
+ if (isBackpressureEnable == false) {
+ return;
+ }
+
+ if (exeQueue == null || recvQueue == null) {
+ exeQueue = task.getExecuteQueue();
+ recvQueue = task.getDeserializeQueue();
+
+ if (exeQueue == null) {
+ LOG.info("Init of excutor-task-" + taskId + " has not been finished!");
+ return;
+ }
+ if (recvQueue == null) {
+ LOG.info("Init of receiver-task-" + taskId + " has not been finished!");
+ return;
+ }
+ }
+
+ LOG.debug("Backpressure Check: exeQueue load=" + (exeQueue.pctFull() * 100) + ", recvQueue load=" + (recvQueue.pctFull() * 100));
+ if (exeQueue.pctFull() > highWaterMark) {
+ samplingSet.add(EventType.startBackpressure);
+ } else if (exeQueue.pctFull() <= lowWaterMark) {
+ samplingSet.add(EventType.stopBackpressure);
+ } else {
+ samplingSet.add(EventType.defaultType);
+ }
+
+ if (intervalCheck.check()) {
+ int startCount = 0, stopCount = 0;
+
+ for (EventType eventType : samplingSet) {
+ if (eventType.equals(EventType.startBackpressure)) {
+ startCount++;
+ } else if (eventType.equals(EventType.stopBackpressure)) {
+ stopCount++;
+ }
+ }
+
+ if (startCount > stopCount) {
+ if (sampleRateCheck(startCount)) {
+ startBackpressure();
+ isUnderBackpressure = true;
+ }
+ } else {
+ if (sampleRateCheck(stopCount) && isUnderBackpressure == true) {
+ stopBackpressure();
+ }
+ }
+
+ samplingSet.clear();
+ }
+ }
+
+ private boolean sampleRateCheck(double count) {
+ double sampleRate = count / samplingSet.size();
+ if (sampleRate > triggerSampleRate)
+ return true;
+ else
+ return false;
+ }
+
+ public void handle(Tuple input) {
+ try {
+ TopoMasterCtrlEvent event = (TopoMasterCtrlEvent) input.getValueByField("ctrlEvent");
+ EventType type = event.getEventType();
+ if (type.equals(EventType.stopBackpressure)) {
+ isUnderBackpressure = false;
+ LOG.info("Received stop backpressure event for task-" + task.getTaskId());
+ } else if (type.equals(EventType.updateBackpressureConfig)) {
+ Map stormConf = (Map) event.getEventValue().get(0);
+ updateConfig(stormConf);
+
+ if (isBackpressureEnable == false) {
+ LOG.info("Disable backpressure in trigger.");
+ isUnderBackpressure = false;
+ samplingSet.clear();
+ } else {
+ LOG.info("Enable backpressure in trigger.");
+ }
+ } else {
+ LOG.info("Received unexpected event, " + type.toString());
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to handle event", e);
+ }
+ }
+
+ private void startBackpressure() {
+ List<Object> value = new ArrayList<Object>();
+ Double flowCtrlTime = Double.valueOf(boltExecutor.getExecuteTime() / 1000);
+ value.add(flowCtrlTime.intValue());
+ TopoMasterCtrlEvent startBp = new TopoMasterCtrlEvent(EventType.startBackpressure, value);
+ output.emit(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, new Values(startBp));
+ LOG.debug("Send start backpressure request for task-{}, flowCtrlTime={}", taskId, flowCtrlTime.intValue());
+ }
+
+ private void stopBackpressure() {
+ TopoMasterCtrlEvent stopBp = new TopoMasterCtrlEvent(EventType.stopBackpressure, null);
+ output.emit(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, new Values(stopBp));
+ LOG.debug("Send stop backpressure request for task-{}", taskId);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/SourceBackpressureInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/SourceBackpressureInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/SourceBackpressureInfo.java
new file mode 100644
index 0000000..05f7d11
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/SourceBackpressureInfo.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.backpressure;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType;
+
+public class SourceBackpressureInfo implements Serializable {
+ private static final long serialVersionUID = -8213491092461721871L;
+
+ // source tasks under backpressure
+ private Set<Integer> tasks;
+
+ // target tasks which has sent request to source task
+ // Map<componentId, source task backpressure info>
+ private Map<String, TargetBackpressureInfo> targetTasks;
+
+ public SourceBackpressureInfo() {
+ this.tasks = new TreeSet<Integer>();
+ this.targetTasks = new HashMap<String, TargetBackpressureInfo>();
+ }
+
+ public Set<Integer> getTasks() {
+ return tasks;
+ }
+
+ public Map<String, TargetBackpressureInfo> getTargetTasks() {
+ return targetTasks;
+ }
+
+ public long getLastestTimeStamp() {
+ long ret = 0;
+
+ for (Entry<String, TargetBackpressureInfo> entry : targetTasks.entrySet()) {
+ TargetBackpressureInfo info = entry.getValue();
+ if (info.getTimeStamp() > ret) {
+ ret = info.getTimeStamp();
+ }
+ }
+ return ret;
+ }
+
+ public EventType getLastestBackpressureEvent() {
+ EventType ret = null;
+ long timeStamp = 0;
+
+ for (Entry<String, TargetBackpressureInfo> entry : targetTasks.entrySet()) {
+ TargetBackpressureInfo info = entry.getValue();
+ if (info.getTimeStamp() > timeStamp) {
+ timeStamp = info.getTimeStamp();
+ ret = info.getBackpressureStatus();
+ }
+ }
+
+ return ret;
+ }
+
+ public int getMaxFlowCtrlTime() {
+ int ret = 0;
+
+ for (Entry<String, TargetBackpressureInfo> entry : targetTasks.entrySet()) {
+ TargetBackpressureInfo info = entry.getValue();
+ if (info.getFlowCtrlTime() > ret) {
+ ret = info.getFlowCtrlTime();
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/TargetBackpressureInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/TargetBackpressureInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/TargetBackpressureInfo.java
new file mode 100644
index 0000000..2f6332b
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/TargetBackpressureInfo.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.backpressure;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType;
+
+public class TargetBackpressureInfo implements Serializable {
+ private static final long serialVersionUID = -1829897435773792484L;
+
+ private Set<Integer> tasks;
+
+ private EventType backpressureStatus;
+ private int flowCtrlTime;
+ private long timeStamp;
+
+ public TargetBackpressureInfo() {
+ this.tasks = new TreeSet<Integer>();
+ this.backpressureStatus = EventType.defaultType;
+ this.flowCtrlTime = -1;
+ this.timeStamp = 0l;
+ }
+
+ public TargetBackpressureInfo(EventType backpressureStatus, int flowCtrlTime, long time) {
+ this.tasks = new TreeSet<Integer>();
+ this.backpressureStatus = backpressureStatus;
+ this.flowCtrlTime = flowCtrlTime;
+ this.timeStamp = time;
+ }
+
+ public Set<Integer> getTasks() {
+ return tasks;
+ }
+
+ public void setBackpressureStatus(EventType status) {
+ this.backpressureStatus = status;
+ }
+
+ public EventType getBackpressureStatus() {
+ return this.backpressureStatus;
+ }
+
+ public void setTimeStamp(long time) {
+ this.timeStamp = time;
+ }
+
+ public long getTimeStamp() {
+ return this.timeStamp;
+ }
+
+ public int getFlowCtrlTime() {
+ return this.flowCtrlTime;
+ }
+
+ public void setFlowCtrlTime(int time) {
+ this.flowCtrlTime = time;
+ }
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+ }
+}