You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:43 UTC

[05/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchReceiver.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchReceiver.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchReceiver.java
index db2e990..7ef7144 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchReceiver.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchReceiver.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.jstorm.callback.AsyncLoopThread;
 import com.alibaba.jstorm.task.TaskReceiver.DeserializeRunnable;
 import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
 
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.BatchTuple;
@@ -32,27 +33,20 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.utils.DisruptorQueue;
 
 public class TaskBatchReceiver extends TaskReceiver {
-    private static Logger LOG = LoggerFactory
-            .getLogger(TaskBatchReceiver.class);
+    private static Logger LOG = LoggerFactory.getLogger(TaskBatchReceiver.class);
 
-    public TaskBatchReceiver(Task task, int taskId, Map stormConf,
-            TopologyContext topologyContext,
-            Map<Integer, DisruptorQueue> innerTaskTransfer,
+    public TaskBatchReceiver(Task task, int taskId, Map stormConf, TopologyContext topologyContext, Map<Integer, DisruptorQueue> innerTaskTransfer,
             TaskStatus taskStatus, String taskName) {
-        super(task, taskId, stormConf, topologyContext, innerTaskTransfer,
-                taskStatus, taskName);
+        super(task, taskId, stormConf, topologyContext, innerTaskTransfer, taskStatus, taskName);
     }
 
     @Override
     protected void setDeserializeThread() {
-        this.deserializeThread =
-                new AsyncLoopThread(new DeserializeBatchRunnable(
-                        deserializeQueue, innerTaskTransfer.get(taskId)));
+        this.deserializeThread = new AsyncLoopThread(new DeserializeBatchRunnable(deserializeQueue, innerTaskTransfer.get(taskId)));
     }
 
     public class DeserializeBatchRunnable extends DeserializeRunnable {
-        public DeserializeBatchRunnable(DisruptorQueue deserializeQueue,
-                DisruptorQueue exeQueue) {
+        public DeserializeBatchRunnable(DisruptorQueue deserializeQueue, DisruptorQueue exeQueue) {
             super(deserializeQueue, exeQueue);
         }
 
@@ -83,14 +77,11 @@ public class TaskBatchReceiver extends TaskReceiver {
                 return tuple;
             } catch (Throwable e) {
                 if (taskStatus.isShutdown() == false) {
-                    LOG.error(
-                            idStr + " recv thread error "
-                                    + JStormUtils.toPrintableString(ser_msg)
-                                    + "\n", e);
+                    LOG.error(idStr + " recv thread error " + JStormUtils.toPrintableString(ser_msg) + "\n", e);
                 }
             } finally {
                 long end = System.nanoTime();
-                deserializeTimer.update((end - start)/1000000.0d);
+                deserializeTimer.update((end - start) / TimeUtils.NS_PER_US);
             }
 
             return null;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchTransfer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchTransfer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchTransfer.java
index e10fe96..07d7cbb 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchTransfer.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBatchTransfer.java
@@ -20,6 +20,7 @@ package com.alibaba.jstorm.task;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,13 +28,15 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.jstorm.callback.AsyncLoopThread;
 import com.alibaba.jstorm.client.ConfigExtension;
 import com.alibaba.jstorm.daemon.worker.WorkerData;
+import com.alibaba.jstorm.daemon.worker.timer.TaskBatchCheckTrigger;
+import com.alibaba.jstorm.daemon.worker.timer.TaskBatchFlushTrigger;
+import com.alibaba.jstorm.utils.EventSampler;
+import com.alibaba.jstorm.utils.Pair;
 
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.TaskMessage;
 import backtype.storm.serialization.KryoTupleSerializer;
 import backtype.storm.tuple.BatchTuple;
+import backtype.storm.tuple.ITupleExt;
 import backtype.storm.tuple.TupleExt;
-import backtype.storm.utils.DisruptorQueue;
 
 /**
  * Batch Tuples, then send out
@@ -43,62 +46,84 @@ import backtype.storm.utils.DisruptorQueue;
  */
 public class TaskBatchTransfer extends TaskTransfer {
 
-    private static Logger LOG = LoggerFactory
-            .getLogger(TaskBatchTransfer.class);
-
+    private static Logger LOG = LoggerFactory.getLogger(TaskBatchTransfer.class);
+    protected static final double BATCH_SIZE_THRESHOLD = 2.0;
+    protected static final int BATCH_FLUSH_INTERVAL_MS = 5;
+    protected static final int BATCH_CHECK_INTERVAL_S = 3600;
+    protected static final int BATCH_EVENT_SAMPLER_INTERVAL_S = 4 * 240;
+    
     private Map<Integer, BatchTuple> batchMap;
+    private final int maxBatchSize;
     private int batchSize;
     private Object lock = new Object();
+    private EventSampler eventSampler = null;
 
-    public TaskBatchTransfer(Task task, String taskName,
-            KryoTupleSerializer serializer, TaskStatus taskStatus,
-            WorkerData workerData) {
+    public TaskBatchTransfer(Task task, String taskName, KryoTupleSerializer serializer, TaskStatus taskStatus, WorkerData workerData) {
         super(task, taskName, serializer, taskStatus, workerData);
 
         batchMap = new HashMap<Integer, BatchTuple>();
-        batchSize =
-                ConfigExtension.getTaskMsgBatchSize(workerData.getStormConf());
+        maxBatchSize = ConfigExtension.getTaskMsgBatchSize(workerData.getStormConf());
+        
+        
+        TaskBatchFlushTrigger batchFlushTrigger = new TaskBatchFlushTrigger(BATCH_FLUSH_INTERVAL_MS, taskName, this);
+        batchFlushTrigger.register(TimeUnit.MILLISECONDS);
+        
+        TaskBatchCheckTrigger batchCheckTrigger = new TaskBatchCheckTrigger(BATCH_CHECK_INTERVAL_S, taskName, this);
+        batchCheckTrigger.register();
+        
+        startCheck();
     }
+    
+    public void setBatchSize(int batchSize) {
+		this.batchSize = batchSize;
+		LOG.info(taskName + " set batch size as " + batchSize);
+	}
 
     @Override
     protected AsyncLoopThread setupSerializeThread() {
         return new AsyncLoopThread(new TransferBatchRunnable());
     }
+    
+    public void startCheck() {
+    	eventSampler = new EventSampler(BATCH_EVENT_SAMPLER_INTERVAL_S);
+    	setBatchSize(maxBatchSize);
+    	LOG.info("Start check batch size, task of  " + taskName);
+    }
+    
+    public void stopCheck() {
+    	eventSampler = null;
+    	LOG.info("Stop check batch size, task of  " + taskName);
+    }
 
-    @Override
-    public void transfer(TupleExt tuple) {
-        int targetTaskid = tuple.getTargetTaskId();
-        synchronized (lock) {
-            BatchTuple batch = getBatchTuple(targetTaskid);
+	@Override
+	public void push(int taskId, TupleExt tuple) {
+		synchronized (lock) {
+			BatchTuple batch = getBatchTuple(taskId);
 
-            batch.addToBatch(tuple);
-            if (batch.isBatchFull()) {
-                pushToQueue(targetTaskid, batch);
-            }
-        }
-    }
+			batch.addToBatch(tuple);
+			if (batch.isBatchFull()) {
+				serializeQueue.publish(batch);
+				batchMap.put(taskId, new BatchTuple(taskId, batchSize));
+			}
+		}
+
+	}
 
     public void flush() {
+    	Map<Integer, BatchTuple> oldBatchMap = null;
         synchronized (lock) {
-            for (Entry<Integer, BatchTuple> entry : batchMap.entrySet()) {
-                int taskId = entry.getKey();
-                BatchTuple batch = entry.getValue();
-                if (batch != null && batch.currBatchSize() > 0) {
-                    pushToQueue(taskId, batch);
-                }
+            oldBatchMap = batchMap;
+            batchMap = new HashMap<Integer, BatchTuple>();
+        }
+        
+        for (Entry<Integer, BatchTuple> entry : oldBatchMap.entrySet()) {
+            BatchTuple batch = entry.getValue();
+            if (batch != null && batch.currBatchSize() > 0) {
+            	serializeQueue.publish(batch);
             }
         }
     }
 
-    private void pushToQueue(int targetTaskid, BatchTuple batch) {
-        DisruptorQueue exeQueue = innerTaskTransfer.get(targetTaskid);
-        if (exeQueue != null) {
-            exeQueue.publish(batch);
-        } else {
-            serializeQueue.publish(batch);
-        }
-        resetBatchTuple(targetTaskid);
-    }
 
     private BatchTuple getBatchTuple(int targetTaskId) {
         BatchTuple ret = batchMap.get(targetTaskId);
@@ -109,33 +134,27 @@ public class TaskBatchTransfer extends TaskTransfer {
         return ret;
     }
 
-    private void resetBatchTuple(int targetTaskId) {
-        batchMap.put(targetTaskId, null);
-    }
 
     protected class TransferBatchRunnable extends TransferRunnable {
-        @Override
-        public void onEvent(Object event, long sequence, boolean endOfBatch)
-                throws Exception {
-
-            if (event == null) {
-                return;
-            }
-
-            long start = System.currentTimeMillis();
-            try {
-                BatchTuple tuple = (BatchTuple) event;
-                int taskid = tuple.getTargetTaskId();
-                byte[] tupleMessage = serializer.serializeBatch(tuple);
-                TaskMessage taskMessage = new TaskMessage(taskid, tupleMessage);
-                IConnection conn = getConnection(taskid);
-                if (conn != null)
-                    conn.send(taskMessage);
-            } finally {
-                long end = System.currentTimeMillis();
-                timer.update(end - start);
-            }
 
+        public byte[] serialize(ITupleExt tuple) {
+    		BatchTuple batchTuple = (BatchTuple)tuple;
+    		if (eventSampler != null) {
+    			Pair<Integer, Double> result = eventSampler.avgCheck(batchTuple.currBatchSize());
+    			if (result != null) {
+    				Double avgBatchSize = result.getSecond();
+    				LOG.info(taskName + " batch average size is " + avgBatchSize);
+    				if (avgBatchSize < BATCH_SIZE_THRESHOLD) {
+    					LOG.info("Due to average size is small, so directly reset batch size as 1");
+    					// set the batch size as 1
+    					// transfer can directly send tuple, don't need wait flush interval
+    					setBatchSize(1);
+    				}
+    				stopCheck();
+    			}
+    			
+    		}
+        	return serializer.serializeBatch(batchTuple);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskInfo.java
index 0eb1c4b..ddfe63d 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskInfo.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskInfo.java
@@ -54,11 +54,8 @@ public class TaskInfo implements Serializable {
 
     @Override
     public boolean equals(Object assignment) {
-        if (assignment instanceof TaskInfo
-                && ((TaskInfo) assignment).getComponentId().equals(
-                        getComponentId())
-                && ((TaskInfo) assignment).getComponentType().equals(
-                        componentType)) {
+        if (assignment instanceof TaskInfo && ((TaskInfo) assignment).getComponentId().equals(getComponentId())
+                && ((TaskInfo) assignment).getComponentType().equals(componentType)) {
             return true;
         }
         return false;
@@ -66,13 +63,11 @@ public class TaskInfo implements Serializable {
 
     @Override
     public int hashCode() {
-        return this.getComponentId().hashCode()
-                + this.getComponentType().hashCode();
+        return this.getComponentId().hashCode() + this.getComponentType().hashCode();
     }
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskReceiver.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskReceiver.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskReceiver.java
index ad32ceb..230ba16 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskReceiver.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskReceiver.java
@@ -17,32 +17,32 @@
  */
 package com.alibaba.jstorm.task;
 
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.Config;
 import backtype.storm.serialization.KryoTupleDeserializer;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.utils.DisruptorQueue;
-import backtype.storm.utils.Utils;
 import backtype.storm.utils.WorkerClassLoader;
 
 import com.alibaba.jstorm.callback.AsyncLoopThread;
 import com.alibaba.jstorm.callback.RunnableCallback;
 import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.common.metric.Histogram;
+import com.alibaba.jstorm.common.metric.AsmGauge;
+import com.alibaba.jstorm.common.metric.AsmHistogram;
 import com.alibaba.jstorm.common.metric.QueueGauge;
-import com.alibaba.jstorm.metric.JStormHealthCheck;
-import com.alibaba.jstorm.metric.JStormMetrics;
-import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.*;
 import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
+import com.esotericsoftware.kryo.KryoException;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.ProducerType;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
 public class TaskReceiver {
     private static Logger LOG = LoggerFactory.getLogger(TaskReceiver.class);
 
@@ -58,13 +58,11 @@ public class TaskReceiver {
     protected DisruptorQueue deserializeQueue;
     protected KryoTupleDeserializer deserializer;
     protected AsyncLoopThread deserializeThread;
-    protected Histogram deserializeTimer;
+    protected AsmHistogram deserializeTimer;
 
     protected TaskStatus taskStatus;
 
-    public TaskReceiver(Task task, int taskId, Map stormConf,
-            TopologyContext topologyContext,
-            Map<Integer, DisruptorQueue> innerTaskTransfer,
+    public TaskReceiver(Task task, int taskId, Map stormConf, TopologyContext topologyContext, Map<Integer, DisruptorQueue> innerTaskTransfer,
             TaskStatus taskStatus, String taskName) {
         this.task = task;
         this.taskId = taskId;
@@ -77,34 +75,24 @@ public class TaskReceiver {
 
         this.isDebugRecv = ConfigExtension.isTopologyDebugRecvTuple(stormConf);
 
-        int queueSize =
-                JStormUtils
-                        .parseInt(
-                                stormConf
-                                        .get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE),
-                                256);
-
-        WaitStrategy waitStrategy =
-                (WaitStrategy) JStormUtils
-                        .createDisruptorWaitStrategy(stormConf);
-        this.deserializeQueue =
-                DisruptorQueue.mkInstance("TaskDeserialize",
-                        ProducerType.MULTI, queueSize, waitStrategy);
+        int queueSize = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256);
+
+        WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(stormConf);
+        this.deserializeQueue = DisruptorQueue.mkInstance("TaskDeserialize", ProducerType.MULTI, queueSize, waitStrategy);
         setDeserializeThread();
-        this.deserializer =
-                new KryoTupleDeserializer(stormConf, topologyContext);
+        this.deserializer = new KryoTupleDeserializer(stormConf, topologyContext);
+
+        String topologyId = topologyContext.getTopologyId();
+        String component = topologyContext.getThisComponentId();
 
         deserializeTimer =
-                JStormMetrics.registerTaskHistogram(taskId,
-                        MetricDef.DESERIALIZE_TIME);
-
-        QueueGauge deserializeQueueGauge =
-                new QueueGauge(idStr + MetricDef.DESERIALIZE_QUEUE,
-                        deserializeQueue);
-        JStormMetrics.registerTaskGauge(deserializeQueueGauge, taskId,
-                MetricDef.DESERIALIZE_QUEUE);
-        JStormHealthCheck.registerTaskHealthCheck(taskId,
-                MetricDef.DESERIALIZE_QUEUE, deserializeQueueGauge);
+                (AsmHistogram) JStormMetrics.registerTaskMetric(
+                        MetricUtils.taskMetricName(topologyId, component, taskId, MetricDef.DESERIALIZE_TIME, MetricType.HISTOGRAM), new AsmHistogram());
+
+        QueueGauge deserializeQueueGauge = new QueueGauge(deserializeQueue, idStr, MetricDef.DESERIALIZE_QUEUE);
+        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topologyId, component, taskId, MetricDef.DESERIALIZE_QUEUE, MetricType.GAUGE),
+                new AsmGauge(deserializeQueueGauge));
+        JStormHealthCheck.registerTaskHealthCheck(taskId, MetricDef.DESERIALIZE_QUEUE, deserializeQueueGauge);
     }
 
     public AsyncLoopThread getDeserializeThread() {
@@ -112,9 +100,7 @@ public class TaskReceiver {
     }
 
     protected void setDeserializeThread() {
-        this.deserializeThread =
-                new AsyncLoopThread(new DeserializeRunnable(deserializeQueue,
-                        innerTaskTransfer.get(taskId)));
+        this.deserializeThread = new AsyncLoopThread(new DeserializeRunnable(deserializeQueue, innerTaskTransfer.get(taskId)));
     }
 
     public DisruptorQueue getDeserializeQueue() {
@@ -126,8 +112,7 @@ public class TaskReceiver {
         DisruptorQueue deserializeQueue;
         DisruptorQueue exeQueue;
 
-        DeserializeRunnable(DisruptorQueue deserializeQueue,
-                DisruptorQueue exeQueue) {
+        DeserializeRunnable(DisruptorQueue deserializeQueue, DisruptorQueue exeQueue) {
             this.deserializeQueue = deserializeQueue;
             this.exeQueue = exeQueue;
         }
@@ -162,24 +147,22 @@ public class TaskReceiver {
                 }
 
                 return tuple;
+            } catch (KryoException e) {
+                throw new RuntimeException(e);
             } catch (Throwable e) {
                 if (taskStatus.isShutdown() == false) {
-                    LOG.error(
-                            idStr + " recv thread error "
-                                    + JStormUtils.toPrintableString(ser_msg)
-                                    + "\n", e);
+                    LOG.error(idStr + " recv thread error " + JStormUtils.toPrintableString(ser_msg) + "\n", e);
                 }
             } finally {
                 long end = System.nanoTime();
-                deserializeTimer.update((end - start)/1000000.0d);
+                deserializeTimer.update((end - start) / TimeUtils.NS_PER_US);
             }
 
             return null;
         }
 
         @Override
-        public void onEvent(Object event, long sequence, boolean endOfBatch)
-                throws Exception {
+        public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
             Object tuple = deserialize((byte[]) event);
 
             if (tuple != null) {
@@ -189,7 +172,7 @@ public class TaskReceiver {
 
         @Override
         public void preRun() {
-            WorkerClassLoader.switchThreadContext();  
+            WorkerClassLoader.switchThreadContext();
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskShutdownDameon.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskShutdownDameon.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskShutdownDameon.java
index c49e9fc..d685c04 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskShutdownDameon.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskShutdownDameon.java
@@ -17,36 +17,31 @@
  */
 package com.alibaba.jstorm.task;
 
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.spout.ISpout;
 import backtype.storm.task.IBolt;
-import backtype.storm.topology.IConfig;
+import backtype.storm.topology.IDynamicComponent;
 import backtype.storm.utils.WorkerClassLoader;
-
 import com.alibaba.jstorm.callback.AsyncLoopThread;
 import com.alibaba.jstorm.cluster.StormClusterState;
 import com.alibaba.jstorm.daemon.worker.ShutdownableDameon;
-import com.alibaba.jstorm.metric.JStormMetrics;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatRunable;
 import com.alibaba.jstorm.utils.JStormUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
 
 /**
  * shutdown one task
  * 
  * @author yannian/Longda
- * 
  */
 public class TaskShutdownDameon implements ShutdownableDameon {
-    private static Logger LOG = LoggerFactory
-            .getLogger(TaskShutdownDameon.class);
+    private static Logger LOG = LoggerFactory.getLogger(TaskShutdownDameon.class);
 
     public static final byte QUIT_MSG = (byte) 0xff;
 
+    private Task task;
     private TaskStatus taskStatus;
     private String topology_id;
     private Integer task_id;
@@ -55,16 +50,15 @@ public class TaskShutdownDameon implements ShutdownableDameon {
     private Object task_obj;
     private boolean isClosed = false;
 
-    public TaskShutdownDameon(TaskStatus taskStatus, String topology_id,
-            Integer task_id, List<AsyncLoopThread> all_threads,
-            StormClusterState zkCluster, Object task_obj) {
+    public TaskShutdownDameon(TaskStatus taskStatus, String topology_id, Integer task_id, List<AsyncLoopThread> all_threads, StormClusterState zkCluster,
+            Object task_obj, Task task) {
         this.taskStatus = taskStatus;
         this.topology_id = topology_id;
         this.task_id = task_id;
         this.all_threads = all_threads;
         this.zkCluster = zkCluster;
         this.task_obj = task_obj;
-
+        this.task = task;
     }
 
     @Override
@@ -104,18 +98,9 @@ public class TaskShutdownDameon implements ShutdownableDameon {
         closeComponent(task_obj);
 
         try {
-        	JStormMetrics.unregisterTask(task_id);
-            TaskHeartbeatRunable.unregisterTaskStats(task_id);
-            zkCluster.remove_task_heartbeat(topology_id, task_id);
+            zkCluster.disconnect();
         } catch (Exception e) {
-            // TODO Auto-generated catch block
-            LOG.info("Failed to cleanup");
-        } finally {
-            try {
-                zkCluster.disconnect();
-            } catch (Exception e) {
-                LOG.info("Failed to disconnect", e);
-            }
+            LOG.error("Failed to disconnect zk for task-" + task_id);
         }
 
         LOG.info("Successfully shutdown task " + topology_id + ":" + task_id);
@@ -170,19 +155,22 @@ public class TaskShutdownDameon implements ShutdownableDameon {
         }
     }
 
-    public void updateConf(Map conf) {
-        if (task_obj instanceof IConfig) {
-            ((IConfig) task_obj).updateConf(conf);
+    public void update(Map conf) {
+        if (task_obj instanceof IDynamicComponent) {
+            ((IDynamicComponent) task_obj).update(conf);
         }
     }
 
     @Override
     public void run() {
-        // TODO Auto-generated method stub
         shutdown();
     }
 
     public int getTaskId() {
         return this.task_id;
     }
+
+    public Task getTask() {
+        return this.task;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskTransfer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskTransfer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskTransfer.java
index efe6dee..4da4330 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskTransfer.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskTransfer.java
@@ -24,40 +24,44 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.Config;
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.scheduler.WorkerSlot;
-import backtype.storm.serialization.KryoTupleSerializer;
-import backtype.storm.tuple.TupleExt;
-import backtype.storm.utils.DisruptorQueue;
-import backtype.storm.utils.Utils;
-import backtype.storm.utils.WorkerClassLoader;
-
 import com.alibaba.jstorm.callback.AsyncLoopRunnable;
 import com.alibaba.jstorm.callback.AsyncLoopThread;
 import com.alibaba.jstorm.callback.RunnableCallback;
-import com.alibaba.jstorm.common.metric.MetricRegistry;
+import com.alibaba.jstorm.common.metric.AsmGauge;
+import com.alibaba.jstorm.common.metric.AsmHistogram;
 import com.alibaba.jstorm.common.metric.QueueGauge;
-import com.alibaba.jstorm.common.metric.Timer;
 import com.alibaba.jstorm.daemon.worker.WorkerData;
 import com.alibaba.jstorm.metric.JStormHealthCheck;
 import com.alibaba.jstorm.metric.JStormMetrics;
 import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.task.backpressure.BackpressureController;
 import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.ProducerType;
 
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.serialization.KryoTupleSerializer;
+import backtype.storm.tuple.ITupleExt;
+import backtype.storm.tuple.TupleExt;
+import backtype.storm.utils.DisruptorQueue;
+import backtype.storm.utils.Utils;
+import backtype.storm.utils.WorkerClassLoader;
+
 /**
  * Sending entrance
- * 
+ * <p/>
  * Task sending all tuples through this Object
- * 
+ * <p/>
  * Serialize the Tuple and put the serialized data to the sending queue
  * 
  * @author yannian
- * 
  */
 public class TaskTransfer {
 
@@ -71,15 +75,18 @@ public class TaskTransfer {
     protected final AsyncLoopThread serializeThread;
     protected volatile TaskStatus taskStatus;
     protected String taskName;
-    protected Timer timer;
+    protected AsmHistogram serializeTimer;
     protected Task task;
-    
+    protected String topolgyId;
+    protected String componentId;
+    protected int taskId;
+
     protected ConcurrentHashMap<WorkerSlot, IConnection> nodeportSocket;
     protected ConcurrentHashMap<Integer, WorkerSlot> taskNodeport;
 
-    public TaskTransfer(Task task, String taskName,
-            KryoTupleSerializer serializer, TaskStatus taskStatus,
-            WorkerData workerData) {
+    protected BackpressureController backpressureController;
+
+    public TaskTransfer(Task task, String taskName, KryoTupleSerializer serializer, TaskStatus taskStatus, WorkerData workerData) {
         this.task = task;
         this.taskName = taskName;
         this.serializer = serializer;
@@ -87,49 +94,53 @@ public class TaskTransfer {
         this.storm_conf = workerData.getStormConf();
         this.transferQueue = workerData.getTransferQueue();
         this.innerTaskTransfer = workerData.getInnerTaskTransfer();
-        
+
         this.nodeportSocket = workerData.getNodeportSocket();
         this.taskNodeport = workerData.getTaskNodeport();
 
-        int queue_size =
-                Utils.getInt(storm_conf
-                        .get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE));
-        WaitStrategy waitStrategy =
-                (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(storm_conf);
-        this.serializeQueue =
-                DisruptorQueue.mkInstance(taskName, ProducerType.MULTI,
-                        queue_size, waitStrategy);
+        this.topolgyId = workerData.getTopologyId();
+        this.componentId = this.task.getComponentId();
+        this.taskId = this.task.getTaskId();
+
+        int queue_size = Utils.getInt(storm_conf.get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE));
+        WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(storm_conf);
+        this.serializeQueue = DisruptorQueue.mkInstance(taskName, ProducerType.MULTI, queue_size, waitStrategy);
         this.serializeQueue.consumerStarted();
 
         String taskId = taskName.substring(taskName.indexOf(":") + 1);
-        String metricName =
-                MetricRegistry.name(MetricDef.SERIALIZE_QUEUE, taskName);
-        QueueGauge serializeQueueGauge =
-                new QueueGauge(metricName, serializeQueue);
-        JStormMetrics.registerTaskGauge(serializeQueueGauge,
-                Integer.valueOf(taskId), MetricDef.SERIALIZE_QUEUE);
-        JStormHealthCheck.registerTaskHealthCheck(Integer.valueOf(taskId),
-                MetricDef.SERIALIZE_QUEUE, serializeQueueGauge);
-        timer =
-                JStormMetrics.registerTaskTimer(Integer.valueOf(taskId),
-                        MetricDef.SERIALIZE_TIME);
+        QueueGauge serializeQueueGauge = new QueueGauge(serializeQueue, taskName, MetricDef.SERIALIZE_QUEUE);
+        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topolgyId, componentId, this.taskId, MetricDef.SERIALIZE_QUEUE, MetricType.GAUGE),
+                new AsmGauge(serializeQueueGauge));
+        JStormHealthCheck.registerTaskHealthCheck(Integer.valueOf(taskId), MetricDef.SERIALIZE_QUEUE, serializeQueueGauge);
+        serializeTimer =
+                (AsmHistogram) JStormMetrics.registerTaskMetric(
+                        MetricUtils.taskMetricName(topolgyId, componentId, this.taskId, MetricDef.SERIALIZE_TIME, MetricType.HISTOGRAM), new AsmHistogram());
 
         serializeThread = setupSerializeThread();
+
+        backpressureController = new BackpressureController(storm_conf, task.getTaskId(), serializeQueue, queue_size);
         LOG.info("Successfully start TaskTransfer thread");
 
     }
 
     public void transfer(TupleExt tuple) {
 
-        int taskid = tuple.getTargetTaskId();
+        int taskId = tuple.getTargetTaskId();
 
-        DisruptorQueue exeQueue = innerTaskTransfer.get(taskid);
+        DisruptorQueue exeQueue = innerTaskTransfer.get(taskId);
         if (exeQueue != null) {
             exeQueue.publish(tuple);
         } else {
-            serializeQueue.publish(tuple);
+            push(taskId, tuple);
         }
 
+        if (backpressureController.isBackpressureMode()) {
+            backpressureController.flowControl();
+        }
+    }
+    
+    public void push(int taskId, TupleExt tuple) {
+    	serializeQueue.publish(tuple);
     }
 
     protected AsyncLoopThread setupSerializeThread() {
@@ -140,6 +151,10 @@ public class TaskTransfer {
         return serializeThread;
     }
 
+    public BackpressureController getBackpressureController() {
+        return backpressureController;
+    }
+
     protected class TransferRunnable extends RunnableCallback implements EventHandler {
 
         private AtomicBoolean shutdown = AsyncLoopRunnable.getShutdown();
@@ -148,6 +163,7 @@ public class TaskTransfer {
         public String getThreadName() {
             return taskName + "-" + TransferRunnable.class.getSimpleName();
         }
+		
 
         @Override
         public void preRun() {
@@ -156,61 +172,80 @@ public class TaskTransfer {
 
         @Override
         public void run() {
-
             while (shutdown.get() == false) {
                 serializeQueue.consumeBatchWhenAvailable(this);
-
             }
-
         }
 
         @Override
         public void postRun() {
             WorkerClassLoader.restoreThreadContext();
         }
+        
+        public byte[] serialize(ITupleExt tuple) {
+        	return serializer.serialize((TupleExt)tuple);
+        }
 
         @Override
-        public void onEvent(Object event, long sequence, boolean endOfBatch)
-                throws Exception {
+        public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
 
             if (event == null) {
                 return;
             }
 
-            long start = System.currentTimeMillis();
+            long start = System.nanoTime();
 
             try {
-                TupleExt tuple = (TupleExt) event;
+			
+			    ITupleExt tuple = (ITupleExt) event;
                 int taskid = tuple.getTargetTaskId();
-                byte[] tupleMessage = serializer.serialize(tuple);
-                TaskMessage taskMessage = new TaskMessage(taskid, tupleMessage);
                 IConnection conn = getConnection(taskid);
                 if (conn != null) {
+                	byte[] tupleMessage = serialize(tuple);
+                    TaskMessage taskMessage = new TaskMessage(taskid, tupleMessage);
                     conn.send(taskMessage);
                 }
             } finally {
-                long end = System.currentTimeMillis();
-                timer.update(end - start);
+                long end = System.nanoTime();
+                serializeTimer.update((end - start)/TimeUtils.NS_PER_US);
             }
 
         }
-        
+
         protected IConnection getConnection(int taskId) {
             IConnection conn = null;
             WorkerSlot nodePort = taskNodeport.get(taskId);
             if (nodePort == null) {
-                String errormsg = "can`t not found IConnection to " + taskId;
-                LOG.warn("Intra transfer warn", new Exception(errormsg));
+                String errormsg = "IConnection to " + taskId + " can't be found";
+                LOG.warn("Internal transfer warn, throw tuple,", new Exception(errormsg));
             } else {
                 conn = nodeportSocket.get(nodePort);
                 if (conn == null) {
-                    String errormsg = "can`t not found nodePort " + nodePort;
-                    LOG.warn("Intra transfer warn", new Exception(errormsg));
+                    String errormsg = "NodePort to" + nodePort + " can't be found";
+                    LOG.warn("Internal transfer warn, throw tuple,", new Exception(errormsg));
                 }
             }
             return conn;
         }
 
+        protected void pullTuples(Object event) {
+            TupleExt tuple = (TupleExt) event;
+            int taskid = tuple.getTargetTaskId();
+            IConnection conn = getConnection(taskid);
+            if (conn != null) {
+                while (conn.available() == false) {
+                    try {
+                        Thread.sleep(1);
+                    } catch (InterruptedException e) {
+
+                    }
+                }
+                byte[] tupleMessage = serializer.serialize(tuple);
+                TaskMessage taskMessage = new TaskMessage(taskid, tupleMessage);
+                conn.send(taskMessage);
+            }
+        }
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TkHbCacheTime.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TkHbCacheTime.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TkHbCacheTime.java
index 596fa35..f703f25 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TkHbCacheTime.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TkHbCacheTime.java
@@ -17,12 +17,12 @@
  */
 package com.alibaba.jstorm.task;
 
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat;
+import backtype.storm.generated.TaskHeartbeat;
+
 import com.alibaba.jstorm.utils.TimeUtils;
 
 /**
- * TkHbCacheTime is describle taskheartcache (Map<topologyId, Map<taskid,
- * Map<tkHbCacheTime, time>>>)
+ * TkHbCacheTime is describle taskheartcache (Map<topologyId, Map<taskid, Map<tkHbCacheTime, time>>>)
  */
 
 public class TkHbCacheTime {
@@ -54,12 +54,13 @@ public class TkHbCacheTime {
         this.taskAssignedTime = taskAssignedTime;
     }
 
-    public void update(TaskHeartbeat zkTaskHeartbeat) {
-        int nowSecs = TimeUtils.current_time_secs();
-        this.nimbusTime = nowSecs;
-        this.taskReportedTime = zkTaskHeartbeat.getTimeSecs();
-        this.taskAssignedTime =
-                zkTaskHeartbeat.getTimeSecs() - zkTaskHeartbeat.getUptimeSecs();
+    public void update(TaskHeartbeat taskHeartbeat) {
+        if (taskHeartbeat != null) {
+            int nowSecs = TimeUtils.current_time_secs();
+            this.nimbusTime = nowSecs;
+            this.taskReportedTime = taskHeartbeat.get_time();
+            this.taskAssignedTime = taskHeartbeat.get_time() - taskHeartbeat.get_uptime();
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/acker/Acker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/acker/Acker.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/acker/Acker.java
index 2be1592..4deb8ed 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/acker/Acker.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/acker/Acker.java
@@ -17,25 +17,21 @@
  */
 package com.alibaba.jstorm.task.acker;
 
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.Config;
 import backtype.storm.task.IBolt;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Tuple;
-
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.RotatingMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
 
 /**
- * 
  * @author yannian/Longda
- * 
  */
 public class Acker implements IBolt {
 
@@ -57,27 +53,19 @@ public class Acker implements IBolt {
     private long rotateTime;
 
     @Override
-    public void prepare(Map stormConf, TopologyContext context,
-            OutputCollector collector) {
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         this.collector = collector;
         // pending = new TimeCacheMap<Object, AckObject>(timeoutSec,
         // TIMEOUT_BUCKET_NUM);
         this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM);
-        this.rotateTime =
-                1000L
-                        * JStormUtils.parseInt(stormConf
-                                .get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30)
-                        / (TIMEOUT_BUCKET_NUM - 1);
+        this.rotateTime = 1000L * JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30) / (TIMEOUT_BUCKET_NUM - 1);
     }
 
     @Override
     public void execute(Tuple input) {
         Object id = input.getValue(0);
-
         AckObject curr = pending.get(id);
-
         String stream_id = input.getSourceStreamId();
-
         if (Acker.ACKER_INIT_STREAM_ID.equals(stream_id)) {
             if (curr == null) {
                 curr = new AckObject();
@@ -95,17 +83,13 @@ public class Acker implements IBolt {
         } else if (Acker.ACKER_ACK_STREAM_ID.equals(stream_id)) {
             if (curr != null) {
                 curr.update_ack(input.getValue(1));
-
             } else {
                 // two case
                 // one is timeout
                 // the other is bolt's ack first come
                 curr = new AckObject();
-
-                curr.val = Long.valueOf(input.getLong(1));
-
+                curr.val = input.getLong(1);
                 pending.put(id, curr);
-
             }
         } else if (Acker.ACKER_FAIL_STREAM_ID.equals(stream_id)) {
             if (curr == null) {
@@ -113,31 +97,23 @@ public class Acker implements IBolt {
                 // already timeout, should go fail
                 return;
             }
-
             curr.failed = true;
-
         } else {
             LOG.info("Unknow source stream");
             return;
         }
 
         Integer task = curr.spout_task;
-
         if (task != null) {
-
             if (curr.val == 0) {
                 pending.remove(id);
                 List values = JStormUtils.mk_list(id);
-
                 collector.emitDirect(task, Acker.ACKER_ACK_STREAM_ID, values);
-
             } else {
-
                 if (curr.failed) {
                     pending.remove(id);
                     List values = JStormUtils.mk_list(id);
-                    collector.emitDirect(task, Acker.ACKER_FAIL_STREAM_ID,
-                            values);
+                    collector.emitDirect(task, Acker.ACKER_FAIL_STREAM_ID, values);
                 }
             }
         } else {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/Backpressure.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/Backpressure.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/Backpressure.java
new file mode 100644
index 0000000..528fc6b
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/Backpressure.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.backpressure;
+
+import java.util.Map;
+
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.utils.JStormUtils;
+
+public abstract class Backpressure {
+    private static final String BACKPRESSURE_DELAY_TIME = "topology.backpressure.delay.time";
+
+    protected volatile boolean isBackpressureEnable;
+
+    protected volatile double highWaterMark;
+    protected volatile double lowWaterMark;
+
+    protected volatile double triggerBpRatio;
+
+    protected volatile long sleepTime;
+
+    public Backpressure(Map stormConf) {
+        this.isBackpressureEnable = ConfigExtension.isBackpressureEnable(stormConf);
+        this.highWaterMark = ConfigExtension.getBackpressureWaterMarkHigh(stormConf);
+        this.lowWaterMark = ConfigExtension.getBackpressureWaterMarkLow(stormConf);
+        this.triggerBpRatio = ConfigExtension.getBackpressureCoordinatorRatio(stormConf);
+    }
+
+    protected void updateConfig(Map stormConf) {
+        if (stormConf == null) {
+            return;
+        }
+
+        if (stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_ENABLE)) {
+            this.isBackpressureEnable = ConfigExtension.isBackpressureEnable(stormConf);
+        }
+
+        if (stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_WATER_MARK_HIGH)) {
+            this.highWaterMark = ConfigExtension.getBackpressureWaterMarkHigh(stormConf);
+        }
+
+        if (stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_WATER_MARK_LOW)) {
+            this.lowWaterMark = ConfigExtension.getBackpressureWaterMarkLow(stormConf);
+        }
+
+        if (stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_COORDINATOR_RATIO)) {
+            this.triggerBpRatio = ConfigExtension.getBackpressureCoordinatorRatio(stormConf);
+        }
+
+        if (stormConf.containsKey(BACKPRESSURE_DELAY_TIME)) {
+            long time = JStormUtils.parseLong(stormConf, 0l);
+            if (time != 0l) {
+                this.sleepTime = time;
+            }
+        }
+    }
+
+    public boolean isBackpressureConfigChange(Map stormConf) {
+        if (stormConf == null) {
+            return false;
+        }
+
+        if (stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_ENABLE) || 
+                stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_WATER_MARK_HIGH) || 
+                stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_WATER_MARK_LOW) || 
+                stormConf.containsKey(ConfigExtension.TOPOLOGY_BACKPRESSURE_COORDINATOR_RATIO) ||
+                stormConf.containsKey(BACKPRESSURE_DELAY_TIME)) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureController.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureController.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureController.java
new file mode 100644
index 0000000..82dd938
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureController.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.backpressure;
+
+import java.util.List;
+import java.util.Map;
+
+import com.alibaba.jstorm.utils.JStormUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.task.TaskTransfer;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.DisruptorQueue;
+
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent;
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType;
+
+/**
+ * Flow Control
+ * 
+ * @author Basti Liu
+ */
+public class BackpressureController extends Backpressure {
+    private static Logger LOG = LoggerFactory.getLogger(BackpressureController.class);
+
+    private int taskId;
+    private DisruptorQueue queueControlled;
+    private int totalQueueSize;
+    private int queueSizeReduced;
+
+    private boolean isBackpressureMode = false;
+
+    private SpoutOutputCollector outputCollector;
+
+    private long maxBound, minBound;
+
+    public BackpressureController(Map conf, int taskId, DisruptorQueue queue, int queueSize) {
+        super(conf);
+        this.queueControlled = queue;
+        this.totalQueueSize = queueSize;
+        this.queueSizeReduced = queueSize;
+        this.taskId = taskId;
+        this.maxBound = 0l;
+        this.minBound = 0l;
+    }
+
+    public void setOutputCollector(SpoutOutputCollector outputCollector) {
+        this.outputCollector = outputCollector;
+    }
+
+    public void control(TopoMasterCtrlEvent ctrlEvent) {
+        if (isBackpressureEnable == false) {
+            return;
+        }
+
+        EventType eventType = ctrlEvent.getEventType();
+        LOG.debug("Received control event, " + eventType.toString());
+        if (eventType.equals(EventType.startBackpressure)) {
+            List<Object> value = ctrlEvent.getEventValue();
+            int flowCtrlTime = value.get(0) != null ? (Integer) value.get(0) : 0;
+            start(flowCtrlTime);
+        } else if (eventType.equals(EventType.stopBackpressure)) {
+            stop();
+        } else if (eventType.equals(EventType.updateBackpressureConfig)) {
+            List<Object> value = ctrlEvent.getEventValue();
+            if (value != null) {
+                Map stormConf = (Map) value.get(0);
+                updateConfig(stormConf);
+
+                if (isBackpressureEnable == false) {
+                    LOG.info("Disable backpressure in controller.");
+                    resetBackpressureInfo();
+                } else {
+                    LOG.info("Enable backpressure in controller");
+                }
+            }
+        }
+    }
+
+    public void flowControl() {
+        if (isBackpressureEnable == false) {
+            return;
+        }
+
+        try {
+            Thread.sleep(sleepTime);
+            while (isQueueCapacityAvailable() == false) {
+                Thread.sleep(1);
+            }
+        } catch (InterruptedException e) {
+            LOG.error("Sleep was interrupted!");
+        }
+    }
+
+    private void resetBackpressureInfo() {
+        sleepTime = 0l;
+        maxBound = 0l;
+        minBound = 0l;
+        queueSizeReduced = totalQueueSize;
+        isBackpressureMode = false;
+    }
+
+    private void start(int flowCtrlTime) {
+        if (flowCtrlTime > 0) {
+            if (maxBound < flowCtrlTime) {
+                sleepTime = flowCtrlTime;
+            } else if (maxBound == flowCtrlTime) {
+                if (sleepTime >= maxBound) {
+                    sleepTime++;
+                } else {
+                    sleepTime = JStormUtils.halfValueOfSum(flowCtrlTime, sleepTime, true);
+                } 
+            } else {
+                if (maxBound <= sleepTime) {
+                    sleepTime++;
+                } else {
+                    if (sleepTime >= flowCtrlTime) {
+                        sleepTime = JStormUtils.halfValueOfSum(maxBound, sleepTime, true);
+                    } else {
+                        sleepTime = JStormUtils.halfValueOfSum(flowCtrlTime, sleepTime, true);
+                    }
+                }
+            }
+        } else {
+            sleepTime++;
+        }
+        if (sleepTime > maxBound) {
+            maxBound = sleepTime;
+        }
+
+        int size = totalQueueSize / 100;
+        queueSizeReduced = size > 10 ? size : 10;
+        isBackpressureMode = true;
+
+        LOG.info("Start backpressure at spout-{}, sleepTime={}, queueSizeReduced={}, flowCtrlTime={}", taskId, sleepTime, queueSizeReduced, flowCtrlTime);
+    }
+
+    private void stop() {
+        if (sleepTime == minBound) {
+            minBound = 0;
+        }
+        sleepTime = JStormUtils.halfValueOfSum(minBound, sleepTime, false);
+
+        if (sleepTime == 0) {
+            resetBackpressureInfo();
+
+            TopoMasterCtrlEvent stopBp = new TopoMasterCtrlEvent(EventType.stopBackpressure, null);
+            outputCollector.emit(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, new Values(stopBp));
+        } else {
+            minBound = sleepTime;
+        }
+
+        LOG.info("Stop backpressure at spout-{}, sleepTime={}, queueSizeReduced={}, flowCtrlTime={}", taskId, sleepTime, queueSizeReduced);
+    }
+
+    public boolean isBackpressureMode() {
+        return isBackpressureMode & isBackpressureEnable;
+    }
+
+    public boolean isQueueCapacityAvailable() {
+        return (queueControlled.population() < queueSizeReduced);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureCoordinator.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureCoordinator.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureCoordinator.java
new file mode 100644
index 0000000..d270078
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureCoordinator.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.backpressure;
+
+import backtype.storm.generated.*;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.cluster.*;
+import com.alibaba.jstorm.task.acker.Acker;
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent;
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+/**
+ * Coordinator is responsible for the request from trigger and controller.
+ * - Event from trigger:
+ *   Find relative controllers (source spouts), and decide if it is required to send out the request.
+ * - Event from controller: 
+ *   If backpressure stop event, send stop request to all target triggers.
+ * 
+ * @author Basti Li
+ */
+public class BackpressureCoordinator extends Backpressure {
+    private static final Logger LOG = LoggerFactory.getLogger(BackpressureCoordinator.class);
+
+    private static final int adjustedTime = 5;
+
+    private TopologyContext context;
+    private StormTopology topology;
+    private OutputCollector output;
+
+    private int topologyMasterId;
+    private Map<Integer, String> taskIdToComponentId;
+    private Map<String, SpoutSpec> spouts;
+    private Map<String, Bolt> bolts;
+
+    // Map<source componentId, Map<ComponentId, backpressure info>>
+    private Map<String, SourceBackpressureInfo> SourceTobackpressureInfo;
+
+    private Integer period;
+
+    private StormClusterState zkCluster;
+    private static final String BACKPRESSURE_TAG = "Backpressure has been ";
+
+
+    public BackpressureCoordinator(OutputCollector output, TopologyContext topologyContext, Integer taskId) {
+        super(topologyContext.getStormConf());
+        this.context = topologyContext;
+        this.topology = topologyContext.getRawTopology();
+        this.spouts = new HashMap<String, SpoutSpec>(); 
+        if (this.topology.get_spouts() != null) {
+            this.spouts.putAll(this.topology.get_spouts());
+        }
+        this.bolts = new HashMap<String, Bolt>(); 
+        if (this.topology.get_bolts() != null) {
+            this.bolts.putAll(this.topology.get_bolts());
+        }
+        this.taskIdToComponentId = topologyContext.getTaskToComponent();
+        this.topologyMasterId = taskId;
+
+        this.output = output;
+
+        int checkInterval = ConfigExtension.getBackpressureCheckIntervl(context.getStormConf());
+        int sampleNum = ConfigExtension.getBackpressureTriggerSampleNumber(context.getStormConf());
+        this.period = checkInterval * sampleNum;
+
+        this.zkCluster = topologyContext.getZkCluster();
+        try {
+            this.SourceTobackpressureInfo = zkCluster.get_backpressure_info(context.getTopologyId());
+            if (this.SourceTobackpressureInfo == null) {
+                this.SourceTobackpressureInfo = new HashMap<String, SourceBackpressureInfo>();
+            } else {
+                LOG.info("Successfully retrieve existing SourceTobackpressureInfo from zk: " + SourceTobackpressureInfo);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to get SourceTobackpressureInfo from zk", e);
+            this.SourceTobackpressureInfo = new HashMap<String, SourceBackpressureInfo>();
+        }
+    }
+
+    private Set<String> getInputSpoutsForBolt(StormTopology topology, String boltComponentId, Set<String> componentsTraversed) {
+        Set<String> ret = new TreeSet<String>();
+
+        if (componentsTraversed == null) {
+            componentsTraversed = new HashSet<String>();
+        }
+
+        Bolt bolt = bolts.get(boltComponentId);
+        if (bolt == null) {
+            return ret;
+        }
+
+        ComponentCommon common = bolt.get_common();
+        Set<GlobalStreamId> inputstreams = common.get_inputs().keySet();
+        Set<String> inputComponents = new TreeSet<String>();
+        for (GlobalStreamId streamId : inputstreams) {
+            inputComponents.add(streamId.get_componentId());
+        }
+
+        Set<String> spoutComponentIds = new HashSet<String>(spouts.keySet());
+        Set<String> boltComponentIds = new HashSet<String>(bolts.keySet());
+        for (String inputComponent : inputComponents) {
+            // Skip the components which has been traversed before, to avoid dead loop when there are loop bolts in topology
+            if (componentsTraversed.contains(inputComponent)) {
+                continue;
+            } else {
+                componentsTraversed.add(inputComponent);
+            }
+
+            if (spoutComponentIds.contains(inputComponent)) {
+                ret.add(inputComponent);
+            } else if (boltComponentIds.contains(inputComponent)) {
+                Set<String> inputs = getInputSpoutsForBolt(topology, inputComponent, componentsTraversed);
+                ret.addAll(inputs);
+            }
+        }
+
+        return ret;
+    }
+
+    public void process(Tuple input) {
+        if (isBackpressureEnable == false) {
+            return;
+        }
+
+        int sourceTask = input.getSourceTask();
+        String componentId = taskIdToComponentId.get(sourceTask);
+        if (componentId == null) {
+            LOG.warn("Receive tuple from unknown task-" + sourceTask);
+            return;
+        }
+
+        if (spouts.keySet().contains(componentId)) {
+            if (SourceTobackpressureInfo.get(componentId) != null) {
+                handleEventFromSpout(sourceTask, input);
+            }
+        } else if (bolts.keySet().contains(componentId)) {
+            handleEventFromBolt(sourceTask, input);
+        }
+    }
+
+    public void updateBackpressureConfig(Map conf) {
+        updateConfig(conf);
+
+        if (isBackpressureEnable == false) {
+            LOG.info("Disable backpressure in coordinator.");
+            SourceTobackpressureInfo.clear();
+        } else {
+            LOG.info("Enable backpressure in coordinator.");
+        }
+
+        TopoMasterCtrlEvent updateBpConfig = new TopoMasterCtrlEvent(EventType.updateBackpressureConfig, new ArrayList<Object>());
+        updateBpConfig.addEventValue(conf);
+        Values values = new Values(updateBpConfig);
+        Set<Integer> targetTasks = new TreeSet<Integer>(taskIdToComponentId.keySet());
+        targetTasks.remove(topologyMasterId);
+        targetTasks.removeAll(context.getComponentTasks(Acker.ACKER_COMPONENT_ID));
+        sendBackpressureMessage(targetTasks, values, EventType.updateBackpressureConfig);
+
+        reportBackpressureStatus();
+    }
+
+    private boolean checkSpoutsUnderBackpressure(Set<String> spouts) {
+        boolean ret = false;
+
+        if (spouts != null) {
+            for (String spout : spouts) {
+                SourceBackpressureInfo backpressureInfo = SourceTobackpressureInfo.get(spout);
+                if (backpressureInfo != null && backpressureInfo.getTasks().size() > 0) {
+                    ret = true;
+                    break;
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    private TargetBackpressureInfo getBackpressureInfoBySourceSpout(String sourceSpout, String targetComponentId, boolean created) {
+        TargetBackpressureInfo ret = null;
+
+        SourceBackpressureInfo info = SourceTobackpressureInfo.get(sourceSpout);
+        if (info == null) {
+            if (created) {
+                info = new SourceBackpressureInfo();
+                SourceTobackpressureInfo.put(sourceSpout, info);
+            }
+        } else {
+            ret = info.getTargetTasks().get(targetComponentId);
+        }
+    
+        if (ret == null && created) {
+            ret = new TargetBackpressureInfo();
+            info.getTargetTasks().put(targetComponentId, ret);
+        }
+        return ret;
+    }
+
+    private boolean checkIntervalExpired(long time) {
+        boolean ret = false;
+        if (time != 0) {
+            if (System.currentTimeMillis() - time > period) {
+                ret = true;
+            }
+        }
+        return ret;
+    }
+
+    private void sendBackpressureMessage(Set<Integer> targetTasks, Values value, EventType backpressureType) {
+        for (Integer taskId : targetTasks) {
+            output.emitDirect(taskId, Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, value);
+            LOG.debug("Send " + backpressureType.toString() + " request to taskId-" + taskId);
+        }
+    }
+
+    private void handleEventFromSpout(int sourceTask, Tuple input) {
+        TopoMasterCtrlEvent ctrlEvent = (TopoMasterCtrlEvent) input.getValueByField("ctrlEvent");
+        EventType type = ctrlEvent.getEventType();
+
+        boolean update = false;
+        if (type.equals(EventType.stopBackpressure)) {
+            String spoutComponentId = taskIdToComponentId.get(sourceTask);
+            SourceBackpressureInfo info = SourceTobackpressureInfo.remove(spoutComponentId);
+            if (info != null) {
+                info.getTasks().remove(sourceTask);
+                if (info.getTasks().size() == 0) {  
+                    for (Entry<String, TargetBackpressureInfo> entry : info.getTargetTasks().entrySet()) {
+                        String componentId = entry.getKey();
+
+                        // Make sure if all source spouts for this bolt are NOT under backpressure mode.
+                        Set<String> sourceSpouts = getInputSpoutsForBolt(topology, componentId, null);
+                        if (checkSpoutsUnderBackpressure(sourceSpouts) == false) {
+                            Set<Integer> tasks = new TreeSet<Integer>();
+                            tasks.addAll(context.getComponentTasks(componentId));
+                            sendBackpressureMessage(tasks, new Values(ctrlEvent), type);
+                        }
+                    }
+                }
+                update = true;
+            } else {
+                LOG.error("Received event from non-recorded spout-" + sourceTask);
+            }
+
+        } else {
+            LOG.warn("Received unexpected event, " + type.toString());
+        }
+
+        // If task set under backpressure has been changed, report the latest status
+        if (update) {
+            reportBackpressureStatus();
+        }
+    }
+
+    private void handleEventFromBolt(int sourceTask, Tuple input) {
+        String componentId = taskIdToComponentId.get(sourceTask);
+        Set<String> inputSpouts = getInputSpoutsForBolt(topology, componentId, null);
+
+        TopoMasterCtrlEvent ctrlEvent = (TopoMasterCtrlEvent) input.getValueByField("ctrlEvent");
+        EventType type = ctrlEvent.getEventType();
+        Set<Integer> notifyList = new TreeSet<Integer>();
+        Values values = null;
+        TargetBackpressureInfo info = null;
+        boolean update = false;
+        if (type.equals(EventType.startBackpressure)) {
+            int flowCtrlTime = (Integer) ctrlEvent.getEventValue().get(0);
+            for (String spout : inputSpouts) {
+                info = getBackpressureInfoBySourceSpout(spout, componentId, true);
+                SourceBackpressureInfo sourceInfo = SourceTobackpressureInfo.get(spout);
+                update = info.getTasks().add(sourceTask);
+                boolean add = false;
+                if (System.currentTimeMillis() - sourceInfo.getLastestTimeStamp() > period) { 
+                    add = true;
+                } else {
+                    EventType lastestBpEvent = sourceInfo.getLastestBackpressureEvent();
+                    if (lastestBpEvent != null && lastestBpEvent.equals(EventType.startBackpressure) == false) {
+                        add = true;
+                    }
+
+                    int maxFlowCtrlTime = sourceInfo.getMaxFlowCtrlTime();
+                    if ((flowCtrlTime - maxFlowCtrlTime > adjustedTime || maxFlowCtrlTime == -1) &&
+                            flowCtrlTime >= 0) {
+                        add = true;
+                    }
+                }
+                info.setFlowCtrlTime(flowCtrlTime);
+                info.setBackpressureStatus(type);
+
+                if (add) {
+                    info.setTimeStamp(System.currentTimeMillis());
+                    // Only when the number of bolt tasks sending request is more than a configured number, coordinator will 
+                    // send out backpressure request to controller. It is used to avoid the problem that very few tasks might
+                    // cause the over control.
+                    double taskBpRatio = Double.valueOf(info.getTasks().size()) / Double.valueOf(context.getComponentTasks(componentId).size()) ;
+                    if (taskBpRatio >= triggerBpRatio) {
+                        Set<Integer> spoutTasks = new TreeSet<Integer>(context.getComponentTasks(spout));
+                        if (spoutTasks != null) {
+                            SourceTobackpressureInfo.get(spout).getTasks().addAll(spoutTasks);
+                            notifyList.addAll(spoutTasks);
+                        }
+                    } else {
+                        update = false;
+                    }
+                } else {
+                    update = false;
+                }
+            }
+
+            List<Object> value = new ArrayList<Object>();
+            value.add(info.getFlowCtrlTime());
+            TopoMasterCtrlEvent startBp = new TopoMasterCtrlEvent(EventType.startBackpressure, value);
+            values = new Values(startBp);
+        } else if (type.equals(EventType.stopBackpressure)) {
+            for (String spout : inputSpouts) {
+                info = getBackpressureInfoBySourceSpout(spout, componentId, false);
+                SourceBackpressureInfo sourceInfo = SourceTobackpressureInfo.get(spout);
+                if (info != null) {
+                    Set<Integer> tasks = info.getTasks();
+                    if (tasks != null) {
+                        if(tasks.remove(sourceTask)) {
+                            update = true;
+                        }
+                    }
+                }
+
+                if (sourceInfo != null && checkIntervalExpired(sourceInfo.getLastestTimeStamp())) {
+                    info.setTimeStamp(System.currentTimeMillis());
+                    Set<Integer> spoutTasks = new TreeSet<Integer>(context.getComponentTasks(spout));
+                    if (spoutTasks != null) {
+                        notifyList.addAll(spoutTasks);
+                    }
+                    info.setBackpressureStatus(type);
+                }
+            }
+
+            // Check if all source spouts are Not under backpressure. If so, notify the bolt.
+            if (checkSpoutsUnderBackpressure(inputSpouts) == false) {
+                notifyList.add(sourceTask);
+            }
+
+            TopoMasterCtrlEvent stoptBp = new TopoMasterCtrlEvent(EventType.stopBackpressure, null);
+            values = new Values(stoptBp);
+        } else {
+            LOG.warn("Received unknown event " + type.toString());
+        }
+
+        sendBackpressureMessage(notifyList, values, type);
+
+        // If task set under backpressure has been changed, report the latest status
+        if (update) {
+            LOG.info("inputspouts=" + inputSpouts + " for " + componentId + "-" + sourceTask + ", eventType=" + type.toString());
+            reportBackpressureStatus();
+        }
+    }
+
+    private Set<Integer> getTasksUnderBackpressure() {
+        Set<Integer> ret = new HashSet<Integer>();
+
+        for (Entry<String, SourceBackpressureInfo> entry : SourceTobackpressureInfo.entrySet()) {
+            SourceBackpressureInfo sourceInfo = entry.getValue();
+            if (sourceInfo.getTasks().size() > 0) {
+                ret.addAll(sourceInfo.getTasks());
+
+                for (Entry<String, TargetBackpressureInfo> targetEntry: sourceInfo.getTargetTasks().entrySet()) {
+                    ret.addAll(targetEntry.getValue().getTasks());
+                }
+                
+            }
+        }
+
+        return ret;
+    }
+
+    private void reportBackpressureStatus() {
+        try {
+            StringBuilder stringBuilder = new StringBuilder();
+            Set<Integer> underTasks = getTasksUnderBackpressure();
+            stringBuilder.append(BACKPRESSURE_TAG);
+            if (underTasks.isEmpty()){
+                stringBuilder.append("closed ");
+            }else {
+                stringBuilder.append("opened: ");
+                stringBuilder.append(underTasks);
+            }
+            zkCluster.report_task_error(context.getTopologyId(), context.getThisTaskId(), stringBuilder.toString(), BACKPRESSURE_TAG);
+            zkCluster.set_backpressure_info(context.getTopologyId(), SourceTobackpressureInfo);
+            LOG.info(stringBuilder.toString());
+        } catch (Exception e) {
+            LOG.error("can't update backpressure state ", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureTrigger.java
new file mode 100644
index 0000000..0f2df95
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/BackpressureTrigger.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.backpressure;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.DisruptorQueue;
+
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.cluster.StormClusterState;
+import com.alibaba.jstorm.task.Task;
+import com.alibaba.jstorm.task.execute.BoltExecutors;
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent;
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType;
+import com.alibaba.jstorm.utils.IntervalCheck;
+
+/**
+ * Responsible for checking if back pressure shall be triggered. 
+ * When heavy load (the size of the queue monitored reaches high water mark), start back pressure, 
+ * and when load goes down, stop back pressure.
+ *         
+ * @author Basti Liu
+ */
+public class BackpressureTrigger extends Backpressure {
+    private static final Logger LOG = LoggerFactory.getLogger(BackpressureTrigger.class);
+
+    private Task task;
+    private int taskId;
+
+    // Queue which is going to be monitored
+    private DisruptorQueue exeQueue;
+    private DisruptorQueue recvQueue;
+
+    private BoltExecutors boltExecutor;
+
+    private volatile boolean isUnderBackpressure = false;
+
+    private IntervalCheck intervalCheck;
+
+    OutputCollector output;
+
+    private List<EventType> samplingSet;
+    private double triggerSampleRate;
+
+    public BackpressureTrigger(Task task, BoltExecutors boltExecutor, Map stormConf, OutputCollector output) {
+        super(stormConf);
+
+        this.task = task;
+        this.taskId = task.getTaskId();
+
+        int sampleNum = ConfigExtension.getBackpressureTriggerSampleNumber(stormConf);
+        int smapleInterval = sampleNum * (ConfigExtension.getBackpressureCheckIntervl(stormConf));
+        this.intervalCheck = new IntervalCheck();
+        this.intervalCheck.setIntervalMs(smapleInterval);
+        this.intervalCheck.start();
+
+        this.samplingSet = new ArrayList<EventType>();
+        this.triggerSampleRate = ConfigExtension.getBackpressureTriggerSampleRate(stormConf);
+
+        this.output = output;
+
+        this.boltExecutor = boltExecutor;
+
+        try {
+            StormClusterState zkCluster = task.getZkCluster();
+            Map<String, SourceBackpressureInfo> backpressureInfo = zkCluster.get_backpressure_info(task.getTopologyId());
+            if (backpressureInfo != null) {
+                for (Entry<String, SourceBackpressureInfo> entry : backpressureInfo.entrySet()) {
+                    SourceBackpressureInfo info = entry.getValue();
+                    Map<String, TargetBackpressureInfo> targetInfoMap = info.getTargetTasks();
+                    if (targetInfoMap != null) {
+                        TargetBackpressureInfo targetInfo = targetInfoMap.get(task.getComponentId());
+                        if (targetInfo != null && targetInfo.getTasks().contains(taskId)) {
+                            isBackpressureEnable = true;
+                            LOG.info("Retrieved backpressure info for task-" + taskId);
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.info("Failed to get backpressure info from zk", e);
+        }
+        LOG.info("Finished BackpressureTrigger init, highWaterMark=" + highWaterMark + ", lowWaterMark=" + lowWaterMark + ", sendInterval="
+                + intervalCheck.getInterval());
+    }
+
+    public void checkAndTrigger() {
+        if (isBackpressureEnable == false) {
+            return;
+        }
+
+        if (exeQueue == null || recvQueue == null) {
+            exeQueue = task.getExecuteQueue();
+            recvQueue = task.getDeserializeQueue();
+            
+            if (exeQueue == null) {
+                LOG.info("Init of excutor-task-" + taskId + " has not been finished!");
+                return;
+            }
+            if (recvQueue == null) {
+                LOG.info("Init of receiver-task-" + taskId + " has not been finished!");
+                return;
+            }
+        }
+
+        LOG.debug("Backpressure Check: exeQueue load=" + (exeQueue.pctFull() * 100) + ", recvQueue load=" + (recvQueue.pctFull() * 100));
+        if (exeQueue.pctFull() > highWaterMark) {
+            samplingSet.add(EventType.startBackpressure);
+        } else if (exeQueue.pctFull() <= lowWaterMark) {
+            samplingSet.add(EventType.stopBackpressure);
+        } else {
+            samplingSet.add(EventType.defaultType);
+        }
+
+        if (intervalCheck.check()) {
+            int startCount = 0, stopCount = 0;
+
+            for (EventType eventType : samplingSet) {
+                if (eventType.equals(EventType.startBackpressure)) {
+                    startCount++;
+                } else if (eventType.equals(EventType.stopBackpressure)) {
+                    stopCount++;
+                }
+            }
+
+            if (startCount > stopCount) {
+                if (sampleRateCheck(startCount)) {
+                    startBackpressure();
+                    isUnderBackpressure = true;
+                }
+            } else {
+                if (sampleRateCheck(stopCount) && isUnderBackpressure == true) {
+                    stopBackpressure();
+                }
+            }
+
+            samplingSet.clear();
+        }
+    }
+
+    private boolean sampleRateCheck(double count) {
+        double sampleRate = count / samplingSet.size();
+        if (sampleRate > triggerSampleRate)
+            return true;
+        else
+            return false;
+    }
+
+    public void handle(Tuple input) {
+        try {
+            TopoMasterCtrlEvent event = (TopoMasterCtrlEvent) input.getValueByField("ctrlEvent");
+            EventType type = event.getEventType();
+            if (type.equals(EventType.stopBackpressure)) {
+                isUnderBackpressure = false;
+                LOG.info("Received stop backpressure event for task-" + task.getTaskId());
+            } else if (type.equals(EventType.updateBackpressureConfig)) {
+                Map stormConf = (Map) event.getEventValue().get(0);
+                updateConfig(stormConf);
+
+                if (isBackpressureEnable == false) {
+                    LOG.info("Disable backpressure in trigger.");
+                    isUnderBackpressure = false;
+                    samplingSet.clear();
+                } else {
+                    LOG.info("Enable backpressure in trigger.");
+                }
+            } else {
+                LOG.info("Received unexpected event, " + type.toString());
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to handle event", e);
+        }
+    }
+
+    private void startBackpressure() {
+        List<Object> value = new ArrayList<Object>();
+        Double flowCtrlTime = Double.valueOf(boltExecutor.getExecuteTime() / 1000);
+        value.add(flowCtrlTime.intValue());
+        TopoMasterCtrlEvent startBp = new TopoMasterCtrlEvent(EventType.startBackpressure, value);
+        output.emit(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, new Values(startBp));
+        LOG.debug("Send start backpressure request for task-{}, flowCtrlTime={}", taskId, flowCtrlTime.intValue());
+    }
+
+    private void stopBackpressure() {
+        TopoMasterCtrlEvent stopBp = new TopoMasterCtrlEvent(EventType.stopBackpressure, null);
+        output.emit(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, new Values(stopBp));
+        LOG.debug("Send stop backpressure request for task-{}", taskId);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/SourceBackpressureInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/SourceBackpressureInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/SourceBackpressureInfo.java
new file mode 100644
index 0000000..05f7d11
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/SourceBackpressureInfo.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.backpressure;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType;
+
+public class SourceBackpressureInfo implements Serializable {
+    private static final long serialVersionUID = -8213491092461721871L;
+
+    // source tasks under backpressure
+    private Set<Integer> tasks;
+
+    // target tasks which has sent request to source task
+    // Map<componentId, source task backpressure info>
+    private Map<String, TargetBackpressureInfo> targetTasks;
+
+    public SourceBackpressureInfo() {
+        this.tasks = new TreeSet<Integer>();
+        this.targetTasks = new HashMap<String, TargetBackpressureInfo>();
+    }
+
+    public Set<Integer> getTasks() {
+        return tasks;
+    }
+
+    public Map<String, TargetBackpressureInfo> getTargetTasks() {
+        return targetTasks;
+    }
+
+    public long getLastestTimeStamp() {
+        long ret = 0;
+
+        for (Entry<String, TargetBackpressureInfo> entry : targetTasks.entrySet()) {
+            TargetBackpressureInfo info = entry.getValue();
+            if (info.getTimeStamp() > ret) {
+                ret = info.getTimeStamp();
+            }
+        }
+        return ret;
+    }
+
+    public EventType getLastestBackpressureEvent() {
+        EventType ret = null;
+        long timeStamp = 0;
+
+        for (Entry<String, TargetBackpressureInfo> entry : targetTasks.entrySet()) {
+            TargetBackpressureInfo info = entry.getValue();
+            if (info.getTimeStamp() > timeStamp) {
+                timeStamp = info.getTimeStamp();
+                ret = info.getBackpressureStatus();
+            }
+        }
+
+        return ret;
+    }
+
+    public int getMaxFlowCtrlTime() {
+        int ret = 0;
+
+        for (Entry<String, TargetBackpressureInfo> entry : targetTasks.entrySet()) {
+            TargetBackpressureInfo info = entry.getValue();
+            if (info.getFlowCtrlTime() > ret) {
+                ret = info.getFlowCtrlTime();
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/TargetBackpressureInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/TargetBackpressureInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/TargetBackpressureInfo.java
new file mode 100644
index 0000000..2f6332b
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/backpressure/TargetBackpressureInfo.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.backpressure;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType;
+
+public class TargetBackpressureInfo implements Serializable {
+    private static final long serialVersionUID = -1829897435773792484L;
+    
+    private Set<Integer> tasks;
+    
+    private EventType backpressureStatus;
+    private int flowCtrlTime;
+    private long timeStamp;
+
+    public TargetBackpressureInfo() {
+        this.tasks = new TreeSet<Integer>();
+        this.backpressureStatus = EventType.defaultType;
+        this.flowCtrlTime = -1;
+        this.timeStamp = 0l;
+    }
+
+    public TargetBackpressureInfo(EventType backpressureStatus, int flowCtrlTime, long time) {
+        this.tasks = new TreeSet<Integer>();
+        this.backpressureStatus = backpressureStatus;
+        this.flowCtrlTime = flowCtrlTime;
+        this.timeStamp = time;
+    }
+
+    public Set<Integer> getTasks() {
+        return tasks;
+    }
+
+    public void setBackpressureStatus(EventType status) {
+        this.backpressureStatus = status;
+    }
+
+    public EventType getBackpressureStatus() {
+        return this.backpressureStatus;
+    }
+
+    public void setTimeStamp(long time) {
+        this.timeStamp = time;
+    }
+
+    public long getTimeStamp() {
+        return this.timeStamp;
+    }
+
+    public int getFlowCtrlTime() {
+        return this.flowCtrlTime;
+    }
+
+    public void setFlowCtrlTime(int time) {
+        this.flowCtrlTime = time;
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+    }
+}