You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/02/14 14:07:39 UTC

[4/9] storm git commit: STORM-2306 - Messaging subsystem redesign. New Backpressure model.

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index 8610040..63ac1a5 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,12 +19,20 @@
 package org.apache.storm.executor.bolt;
 
 import com.google.common.collect.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.function.BooleanSupplier;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
-
+import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.metrics.BuiltinBoltMetrics;
+import org.apache.storm.daemon.metrics.BuiltinMetrics;
+import org.apache.storm.policy.IWaitStrategy;
 import org.apache.storm.daemon.Task;
 import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
 import org.apache.storm.daemon.worker.WorkerState;
@@ -32,54 +40,94 @@ import org.apache.storm.executor.Executor;
 import org.apache.storm.hooks.info.BoltExecuteInfo;
 import org.apache.storm.metric.api.IMetricsRegistrant;
 import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION;
+import org.apache.storm.policy.WaitStrategyPark;
 import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.StatsUtil;
 import org.apache.storm.task.IBolt;
-import org.apache.storm.task.IOutputCollector;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.JCQueue;
+import org.apache.storm.utils.JCQueue.ExitCondition;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-
 public class BoltExecutor extends Executor {
 
     private static final Logger LOG = LoggerFactory.getLogger(BoltExecutor.class);
 
-    private final Callable<Boolean> executeSampler;
+    private final BooleanSupplier executeSampler;
+    private final boolean isSystemBoltExecutor;
+    private final IWaitStrategy consumeWaitStrategy;       // employed when no incoming data
+    private final IWaitStrategy backPressureWaitStrategy;  // employed when outbound path is congested
+    private BoltOutputCollectorImpl outputCollector;
+    private final BoltExecutorStats stats;
+    private final BuiltinMetrics builtInMetrics;
 
     public BoltExecutor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) {
-        super(workerData, executorId, credentials);
+        super(workerData, executorId, credentials, StatsUtil.BOLT);
         this.executeSampler = ConfigUtils.mkStatsSampler(topoConf);
+        this.isSystemBoltExecutor = (executorId == Constants.SYSTEM_EXECUTOR_ID);
+        if (isSystemBoltExecutor) {
+            this.consumeWaitStrategy = makeSystemBoltWaitStrategy();
+        } else {
+            this.consumeWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BOLT_WAIT_STRATEGY));
+            this.consumeWaitStrategy.prepare(topoConf, WAIT_SITUATION.BOLT_WAIT);
+        }
+        this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
+        this.backPressureWaitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
+        this.stats = new BoltExecutorStats(ConfigUtils.samplingRate(this.getTopoConf()), ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
+        this.builtInMetrics = new BuiltinBoltMetrics(stats);
     }
 
-    public void init(Map<Integer, Task> idToTask) {
+    @Override
+    public BoltExecutorStats getStats() {
+        return stats;
+    }
+
+    private static IWaitStrategy makeSystemBoltWaitStrategy() {
+        WaitStrategyPark ws = new WaitStrategyPark();
+        HashMap conf = new HashMap<String, Object>();
+        conf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 5000);
+        ws.prepare(conf, WAIT_SITUATION.BOLT_WAIT);
+        return ws;
+    }
+
+    public void init(ArrayList<Task> idToTask, int idToTaskBase) {
+        executorTransfer.initLocalRecvQueues();
         while (!stormActive.get()) {
             Utils.sleep(100);
         }
 
-        this.errorReportingMetrics.registerAll(topoConf, idToTask.values().iterator().next().getUserContext());
-
-        LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet());
-        for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
-            Task taskData = entry.getValue();
+        if (!componentId.equals(StormCommon.SYSTEM_STREAM_ID)) { // System bolt doesn't call reportError()
+            this.errorReportingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
+        }
+        LOG.info("Preparing bolt {}:{}", componentId, getTaskIds());
+        for (Task taskData : idToTask) {
+            if (taskData == null) {
+                //This happens if the min id is too small
+                continue;
+            }
             IBolt boltObject = (IBolt) taskData.getTaskObject();
             TopologyContext userContext = taskData.getUserContext();
-            taskData.getBuiltInMetrics().registerAll(topoConf, userContext);
+            builtInMetrics.registerAll(topoConf, userContext);
             if (boltObject instanceof ICredentialsListener) {
                 ((ICredentialsListener) boltObject).setCredentials(credentials);
             }
             if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
-                Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", sendQueue, "receive", receiveQueue,
-                        "transfer", workerData.getTransferQueue());
+                Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue, "transfer", workerData.getTransferQueue());
                 BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, userContext);
 
-                Map cachedNodePortToSocket = (Map) workerData.getCachedNodeToPortSocket().get();
+                Map cachedNodePortToSocket = workerData.getCachedNodeToPortSocket().get();
                 BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, topoConf, userContext);
                 BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.getReceiver(), topoConf, userContext);
 
@@ -93,46 +141,92 @@ public class BoltExecutor extends Executor {
                     }
                 }
             } else {
-                Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", sendQueue, "receive", receiveQueue);
+                Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue);
                 BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, userContext);
             }
 
-            IOutputCollector outputCollector = new BoltOutputCollectorImpl(this, taskData, entry.getKey(), rand, hasEventLoggers, isDebug);
+            this.outputCollector = new BoltOutputCollectorImpl(this, taskData, rand, hasEventLoggers, ackingEnabled, isDebug);
             boltObject.prepare(topoConf, userContext, new OutputCollector(outputCollector));
         }
         openOrPrepareWasCalled.set(true);
-        LOG.info("Prepared bolt {}:{}", componentId, idToTask.keySet());
+        LOG.info("Prepared bolt {}:{}", componentId, taskIds);
         setupTicks(false);
         setupMetrics();
     }
 
     @Override
-    public Callable<Object> call() throws Exception {
-        init(idToTask);
+    public Callable<Long> call() throws Exception {
+        init(idToTask, idToTaskBase);
+
+        return new Callable<Long>() {
+            private ExitCondition tillNoPendingEmits = () -> pendingEmits.isEmpty();
+            int bpIdleCount = 0;
+            int consumeIdleCounter = 0;
 
-        return new Callable<Object>() {
             @Override
-            public Object call() throws Exception {
-                receiveQueue.consumeBatchWhenAvailable(BoltExecutor.this);
+            public Long call() throws Exception {
+                boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
+                if (pendingEmitsIsEmpty) {
+                    if (bpIdleCount != 0) {
+                        LOG.debug("Ending Back Pressure Wait stretch : {}", bpIdleCount);
+                    }
+                    bpIdleCount = 0;
+                    int consumeCount = receiveQueue.consume(BoltExecutor.this, tillNoPendingEmits);
+                    if (consumeCount == 0) {
+                        if (consumeIdleCounter == 0) {
+                            LOG.debug("Invoking consume wait strategy");
+                        }
+                        consumeIdleCounter = consumeWaitStrategy.idle(consumeIdleCounter);
+                        if (Thread.interrupted()) {
+                            throw new InterruptedException();
+                        }
+                    } else {
+                        if (consumeIdleCounter != 0) {
+                            LOG.debug("Ending consume wait stretch : {}", consumeIdleCounter);
+                        }
+                        consumeIdleCounter = 0;
+                    }
+                } else {
+                    if (bpIdleCount == 0) { // check avoids multiple log msgs when spinning in a idle loop
+                        LOG.debug("Experiencing Back Pressure. Entering BackPressure Wait. PendingEmits = {}", pendingEmits.size());
+                    }
+                    bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
+                }
+
                 return 0L;
             }
+
+            // returns true if pendingEmits is empty
+            private boolean tryFlushPendingEmits() {
+                for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {
+                    if (executorTransfer.tryTransfer(t, null)) {
+                        pendingEmits.poll();
+                    } else { // to avoid reordering of emits, stop at first failure
+                        return false;
+                    }
+                }
+                return true;
+            }
+
         };
     }
 
     @Override
     public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
         String streamId = tuple.getSourceStreamId();
-        if (Constants.CREDENTIALS_CHANGED_STREAM_ID.equals(streamId)) {
-            Object taskObject = idToTask.get(taskId).getTaskObject();
+        if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
+            outputCollector.flush();
+        } else if (Constants.METRICS_TICK_STREAM_ID.equals(streamId)) {
+            metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
+        } else if (Constants.CREDENTIALS_CHANGED_STREAM_ID.equals(streamId)) {
+            Object taskObject = idToTask.get(taskId - idToTaskBase).getTaskObject();
             if (taskObject instanceof ICredentialsListener) {
                 ((ICredentialsListener) taskObject).setCredentials((Map<String, String>) tuple.getValue(0));
             }
-        } else if (Constants.METRICS_TICK_STREAM_ID.equals(streamId)) {
-            metricsTick(idToTask.get(taskId), tuple);
         } else {
-            IBolt boltObject = (IBolt) idToTask.get(taskId).getTaskObject();
-            boolean isSampled = sampler.call();
-            boolean isExecuteSampler = executeSampler.call();
+            IBolt boltObject = (IBolt) idToTask.get(taskId - idToTaskBase).getTaskObject();
+            boolean isSampled = sampler.getAsBoolean();
+            boolean isExecuteSampler = executeSampler.getAsBoolean();
             Long now = (isSampled || isExecuteSampler) ? Time.currentTimeMillis() : null;
             if (isSampled) {
                 tuple.setProcessSampleStartTime(now);
@@ -147,11 +241,14 @@ public class BoltExecutor extends Executor {
             if (isDebug) {
                 LOG.info("Execute done TUPLE {} TASK: {} DELTA: {}", tuple, taskId, delta);
             }
-            new BoltExecuteInfo(tuple, taskId, delta).applyOn(idToTask.get(taskId).getUserContext());
+            TopologyContext topologyContext = idToTask.get(taskId - idToTaskBase).getUserContext();
+            if (!topologyContext.getHooks().isEmpty()) // perf critical check to avoid unnecessary allocation
+            {
+                new BoltExecuteInfo(tuple, taskId, delta).applyOn(topologyContext);
+            }
             if (delta >= 0) {
-                ((BoltExecutorStats) stats).boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
+                stats.boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
             }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
index 696447e..facbb75 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@@ -6,15 +6,16 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.executor.bolt;
 
 import java.util.Collection;
@@ -23,18 +24,21 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+
 import org.apache.storm.daemon.Acker;
 import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.ExecutorTransfer;
 import org.apache.storm.hooks.info.BoltAckInfo;
 import org.apache.storm.hooks.info.BoltFailInfo;
 import org.apache.storm.stats.BoltExecutorStats;
 import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.MessageId;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,43 +47,60 @@ public class BoltOutputCollectorImpl implements IOutputCollector {
     private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class);
 
     private final BoltExecutor executor;
-    private final Task taskData;
+    private final Task task;
     private final int taskId;
     private final Random random;
     private final boolean isEventLoggers;
+    private final ExecutorTransfer xsfer;
+    private boolean ackingEnabled;
     private final boolean isDebug;
 
-    public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, int taskId, Random random,
-                                   boolean isEventLoggers, boolean isDebug) {
+    public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, Random random,
+                                   boolean isEventLoggers, boolean ackingEnabled, boolean isDebug) {
         this.executor = executor;
-        this.taskData = taskData;
-        this.taskId = taskId;
+        this.task = taskData;
+        this.taskId = taskData.getTaskId();
         this.random = random;
         this.isEventLoggers = isEventLoggers;
+        this.ackingEnabled = ackingEnabled;
         this.isDebug = isDebug;
+        this.xsfer = executor.getExecutorTransfer();
     }
 
     public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-        return boltEmit(streamId, anchors, tuple, null);
+        try {
+            return boltEmit(streamId, anchors, tuple, null);
+        } catch (InterruptedException e) {
+            LOG.warn("Thread interrupted when emiting tuple.");
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-        boltEmit(streamId, anchors, tuple, taskId);
+        try {
+            boltEmit(streamId, anchors, tuple, taskId);
+        } catch (InterruptedException e) {
+            LOG.warn("Thread interrupted when emiting tuple.");
+            throw new RuntimeException(e);
+        }
     }
 
-    private List<Integer> boltEmit(String streamId, Collection<Tuple> anchors, List<Object> values, Integer targetTaskId) {
+    private List<Integer> boltEmit(String streamId, Collection<Tuple> anchors, List<Object> values,
+                                   Integer targetTaskId) throws InterruptedException {
         List<Integer> outTasks;
         if (targetTaskId != null) {
-            outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, values);
+            outTasks = task.getOutgoingTasks(targetTaskId, streamId, values);
         } else {
-            outTasks = taskData.getOutgoingTasks(streamId, values);
+            outTasks = task.getOutgoingTasks(streamId, values);
         }
 
-        for (Integer t : outTasks) {
-            Map<Long, Long> anchorsToIds = new HashMap<>();
-            if (anchors != null) {
-                for (Tuple a : anchors) {
+        for (int i = 0; i < outTasks.size(); ++i) {
+            Integer t = outTasks.get(i);
+            MessageId msgId;
+            if (ackingEnabled && anchors != null) {
+                final Map<Long, Long> anchorsToIds = new HashMap<>();
+                for (Tuple a : anchors) {  // perf critical path. would be nice to avoid iterator allocation here and below
                     Set<Long> rootIds = a.getMessageId().getAnchorsToIds().keySet();
                     if (rootIds.size() > 0) {
                         long edgeId = MessageId.generateId(random);
@@ -89,54 +110,63 @@ public class BoltOutputCollectorImpl implements IOutputCollector {
                         }
                     }
                 }
+                msgId = MessageId.makeId(anchorsToIds);
+            } else {
+                msgId = MessageId.makeUnanchored();
             }
-            MessageId msgId = MessageId.makeId(anchorsToIds);
-            TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId);
-            executor.getExecutorTransfer().transfer(t, tupleExt);
+            TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), taskId, streamId, msgId);
+            xsfer.tryTransfer(new AddressedTuple(t, tupleExt), executor.getPendingEmits());
         }
         if (isEventLoggers) {
-            executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), null, random);
+            task.sendToEventLogger(executor, values, executor.getComponentId(), null, random, executor.getPendingEmits());
         }
         return outTasks;
     }
 
     @Override
     public void ack(Tuple input) {
+        if (!ackingEnabled) {
+            return;
+        }
         long ackValue = ((TupleImpl) input).getAckVal();
         Map<Long, Long> anchorsToIds = input.getMessageId().getAnchorsToIds();
         for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) {
-            executor.sendUnanchored(taskData, Acker.ACKER_ACK_STREAM_ID,
-                    new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)),
-                    executor.getExecutorTransfer());
+            task.sendUnanchored(Acker.ACKER_ACK_STREAM_ID,
+                new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)),
+                executor.getExecutorTransfer(), executor.getPendingEmits());
         }
         long delta = tupleTimeDelta((TupleImpl) input);
         if (isDebug) {
             LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
         }
-        BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
-        boltAckInfo.applyOn(taskData.getUserContext());
+
+        if (!task.getUserContext().getHooks().isEmpty()) {
+            BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
+            boltAckInfo.applyOn(task.getUserContext());
+        }
         if (delta >= 0) {
-            ((BoltExecutorStats) executor.getStats()).boltAckedTuple(
-                    input.getSourceComponent(), input.getSourceStreamId(), delta);
+            executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta);
         }
     }
 
     @Override
     public void fail(Tuple input) {
+        if (!ackingEnabled) {
+            return;
+        }
         Set<Long> roots = input.getMessageId().getAnchors();
         for (Long root : roots) {
-            executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID,
-                    new Values(root), executor.getExecutorTransfer());
+            task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID,
+                new Values(root), executor.getExecutorTransfer(), executor.getPendingEmits());
         }
         long delta = tupleTimeDelta((TupleImpl) input);
         if (isDebug) {
             LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
         }
         BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
-        boltFailInfo.applyOn(taskData.getUserContext());
+        boltFailInfo.applyOn(task.getUserContext());
         if (delta >= 0) {
-            ((BoltExecutorStats) executor.getStats()).boltFailedTuple(
-                    input.getSourceComponent(), input.getSourceStreamId(), delta);
+            executor.getStats().boltFailedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta);
         }
     }
 
@@ -144,8 +174,18 @@ public class BoltOutputCollectorImpl implements IOutputCollector {
     public void resetTimeout(Tuple input) {
         Set<Long> roots = input.getMessageId().getAnchors();
         for (Long root : roots) {
-            executor.sendUnanchored(taskData, Acker.ACKER_RESET_TIMEOUT_STREAM_ID,
-                    new Values(root), executor.getExecutorTransfer());
+            task.sendUnanchored(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, new Values(root),
+                executor.getExecutorTransfer(), executor.getPendingEmits());
+        }
+    }
+
+    @Override
+    public void flush() {
+        try {
+            xsfer.flush();
+        } catch (InterruptedException e) {
+            LOG.warn("Bolt thread interrupted during flush()");
+            throw new RuntimeException(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index 59139b1..5f6773c 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -18,27 +18,36 @@
 package org.apache.storm.executor.spout;
 
 import com.google.common.collect.ImmutableMap;
+
 import java.util.concurrent.Callable;
+
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.ICredentialsListener;
 import org.apache.storm.daemon.Acker;
 import org.apache.storm.daemon.StormCommon;
 import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetrics;
 import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.BuiltinSpoutMetrics;
 import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
 import org.apache.storm.daemon.worker.WorkerState;
 import org.apache.storm.executor.Executor;
 import org.apache.storm.executor.TupleInfo;
 import org.apache.storm.hooks.info.SpoutAckInfo;
 import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION;
 import org.apache.storm.spout.ISpout;
 import org.apache.storm.spout.ISpoutWaitStrategy;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.JCQueue;
 import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.DisruptorQueue;
 import org.apache.storm.utils.MutableLong;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ReflectionUtils;
@@ -57,6 +66,7 @@ public class SpoutExecutor extends Executor {
     private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class);
 
     private final ISpoutWaitStrategy spoutWaitStrategy;
+    private final IWaitStrategy backPressureWaitStrategy;
     private Integer maxSpoutPending;
     private final AtomicBoolean lastActive;
     private List<ISpout> spouts;
@@ -66,33 +76,46 @@ public class SpoutExecutor extends Executor {
     private final SpoutThrottlingMetrics spoutThrottlingMetrics;
     private final boolean hasAckers;
     private RotatingMap<Long, TupleInfo> pending;
-    private final boolean backPressureEnabled;
+    SpoutOutputCollectorImpl spoutOutputCollector;
+    private final SpoutExecutorStats stats;
+    private final BuiltinMetrics builtInMetrics;
+    private long threadId = 0;
 
     public SpoutExecutor(final WorkerState workerData, final List<Long> executorId, Map<String, String> credentials) {
-        super(workerData, executorId, credentials);
+        super(workerData, executorId, credentials, StatsUtil.SPOUT);
         this.spoutWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
         this.spoutWaitStrategy.prepare(topoConf);
-
-        this.backPressureEnabled = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+        this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
+        this.backPressureWaitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
 
         this.lastActive = new AtomicBoolean(false);
         this.hasAckers = StormCommon.hasAckers(topoConf);
         this.emittedCount = new MutableLong(0);
         this.emptyEmitStreak = new MutableLong(0);
         this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+        this.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(this.getTopoConf()),ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
+        this.builtInMetrics = new BuiltinSpoutMetrics(stats);
     }
 
-    public void init(final Map<Integer, Task> idToTask) {
+    @Override
+    public SpoutExecutorStats getStats() {
+        return stats;
+    }
+
+    public void init(final ArrayList<Task> idToTask, int idToTaskBase) {
+        this.threadId = Thread.currentThread().getId();
+        executorTransfer.initLocalRecvQueues();
         while (!stormActive.get()) {
             Utils.sleep(100);
         }
 
-        LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+        LOG.info("Opening spout {}:{}", componentId, taskIds );
         this.idToTask = idToTask;
         this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
         this.spouts = new ArrayList<>();
-        for (Task task : idToTask.values()) {
-            this.spouts.add((ISpout) task.getTaskObject());
+        for (Task task : idToTask) {
+            if(task!=null)
+                this.spouts.add((ISpout) task.getTaskObject());
         }
         this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() {
             @Override
@@ -101,24 +124,27 @@ public class SpoutExecutor extends Executor {
                 if (tupleInfo.getTimestamp() != 0) {
                     timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
                 }
-                failSpoutMsg(SpoutExecutor.this, idToTask.get(tupleInfo.getTaskId()), timeDelta, tupleInfo, "TIMEOUT");
+                failSpoutMsg(SpoutExecutor.this, idToTask.get(tupleInfo.getTaskId() - idToTaskBase), timeDelta, tupleInfo, "TIMEOUT");
             }
         });
 
-        this.spoutThrottlingMetrics.registerAll(topoConf, idToTask.values().iterator().next().getUserContext());
-        this.errorReportingMetrics.registerAll(topoConf, idToTask.values().iterator().next().getUserContext());
+        this.spoutThrottlingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
+        this.errorReportingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
         this.outputCollectors = new ArrayList<>();
-        for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
-            Task taskData = entry.getValue();
+        for (int i=0; i<idToTask.size(); ++i) {
+            Task taskData = idToTask.get(i);
+            if (taskData==null) {
+                continue;
+            }
             ISpout spoutObject = (ISpout) taskData.getTaskObject();
-            SpoutOutputCollectorImpl spoutOutputCollector = new SpoutOutputCollectorImpl(
-                    spoutObject, this, taskData, entry.getKey(), emittedCount,
+            spoutOutputCollector = new SpoutOutputCollectorImpl(
+                    spoutObject, this, taskData, emittedCount,
                     hasAckers, rand, hasEventLoggers, isDebug, pending);
             SpoutOutputCollector outputCollector = new SpoutOutputCollector(spoutOutputCollector);
             this.outputCollectors.add(outputCollector);
 
-            taskData.getBuiltInMetrics().registerAll(topoConf, taskData.getUserContext());
-            Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", sendQueue, "receive", receiveQueue);
+            builtInMetrics.registerAll(topoConf, taskData.getUserContext());
+            Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue);
             BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, taskData.getUserContext());
 
             if (spoutObject instanceof ICredentialsListener) {
@@ -127,75 +153,152 @@ public class SpoutExecutor extends Executor {
             spoutObject.open(topoConf, taskData.getUserContext(), outputCollector);
         }
         openOrPrepareWasCalled.set(true);
-        LOG.info("Opened spout {}:{}", componentId, idToTask.keySet());
+        LOG.info("Opened spout {}:{}", componentId, taskIds);
         setupTicks(true);
         setupMetrics();
     }
 
     @Override
-    public Callable<Object> call() throws Exception {
-        init(idToTask);
-
-        return new Callable<Object>() {
+    public Callable<Long> call() throws Exception {
+        init(idToTask, idToTaskBase);
+        return new Callable<Long>() {
+            int recvqCheckSkips = 0;
+            final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount();
+            int bpIdleCount = 0;
+            int rmspCount = 0;
             @Override
-            public Object call() throws Exception {
-                receiveQueue.consumeBatch(SpoutExecutor.this);
-
-                final long currCount = emittedCount.get();
-                final boolean throttleOn = backPressureEnabled && SpoutExecutor.this.throttleOn.get();
-                final boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
-                final boolean isActive = stormActive.get();
-                if (isActive) {
-                    if (!lastActive.get()) {
-                        lastActive.set(true);
-                        LOG.info("Activating spout {}:{}", componentId, idToTask.keySet());
-                        for (ISpout spout : spouts) {
-                            spout.activate();
-                        }
-                    }
-                    if (!sendQueue.isFull() && !throttleOn && !reachedMaxSpoutPending) {
-                        for (ISpout spout : spouts) {
-                            spout.nextTuple();
-                        }
+            public Long call() throws Exception {
+                int receiveCount = 0;
+                if (recvqCheckSkips++ == recvqCheckSkipCountMax) {
+                    receiveCount = receiveQueue.consume(SpoutExecutor.this);
+                    recvqCheckSkips = 0;
+                }
+                long currCount = emittedCount.get();
+                boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
+                boolean isActive = stormActive.get();
+
+                if (!isActive) {
+                    inactiveExecute();
+                    return 0L;
+                }
+
+                if (!lastActive.get()) {
+                    lastActive.set(true);
+                    activateSpouts();
+                }
+                boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
+                boolean noEmits = true;
+                long emptyStretch = 0;
+
+                if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) {
+                    for (int j = 0; j < spouts.size(); j++) { // in critical path. don't use iterators.
+                        spouts.get(j).nextTuple();
                     }
-                } else {
-                    if (lastActive.get()) {
-                        lastActive.set(false);
-                        LOG.info("Deactivating spout {}:{}", componentId, idToTask.keySet());
-                        for (ISpout spout : spouts) {
-                            spout.deactivate();
-                        }
+                    noEmits = (currCount == emittedCount.get());
+                    if (noEmits) {
+                        emptyEmitStreak.increment();
+                    } else {
+                        emptyStretch = emptyEmitStreak.get();
+                        emptyEmitStreak.set(0);
                     }
-                    long start = Time.currentTimeMillis();
-                    Time.sleep(100);
-                    spoutThrottlingMetrics.skippedInactiveMs(Time.currentTimeMillis() - start);
                 }
-                if (currCount == emittedCount.get() && isActive) {
-                    emptyEmitStreak.increment();
-                    long start = Time.currentTimeMillis();
-                    spoutWaitStrategy.emptyEmit(emptyEmitStreak.get());
-                    if (throttleOn) {
-                        spoutThrottlingMetrics.skippedThrottleMs(Time.currentTimeMillis() - start);
-                    } else if (reachedMaxSpoutPending) {
-                        spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start);
-                    }
+                if (reachedMaxSpoutPending) {
+                    if (rmspCount == 0)
+                        LOG.debug("Reached max spout pending");
+                    rmspCount++;
                 } else {
-                    emptyEmitStreak.set(0);
+                    if (rmspCount > 0)
+                        LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount);
+                    rmspCount = 0;
+                }
+
+                if (receiveCount > 1) {
+                    // continue without idling
+                    return 0L;
+                }
+                if ( !pendingEmits.isEmpty() ) { // then facing backpressure
+                    backPressureWaitStrategy();
+                    return 0L;
+                }
+                bpIdleCount = 0;
+                if (noEmits) {
+                    spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch);
+                    return 0L;
                 }
                 return 0L;
             }
+
+            private void backPressureWaitStrategy() throws InterruptedException {
+                long start = Time.currentTimeMillis();
+                if (bpIdleCount == 0) { // check avoids multiple log msgs when in a idle loop
+                    LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait.");
+                }
+                bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
+                spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start);
+            }
+
+            // returns true if pendingEmits is empty
+            private boolean tryFlushPendingEmits() {
+                for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {
+                    if (executorTransfer.tryTransfer(t, null)) {
+                        pendingEmits.poll();
+                    } else { // to avoid reordering of emits, stop at first failure
+                        return false;
+                    }
+                }
+                return true;
+            }
         };
     }
 
+    private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) {
+        emptyEmitStreak.increment();
+        long start = Time.currentTimeMillis();
+        spoutWaitStrategy.emptyEmit(emptyEmitStreak.get());
+        if (reachedMaxSpoutPending) {
+            spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start);
+        } else {
+            if (emptyStretch > 0) {
+                LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch);
+            }
+        }
+    }
+
+    private void activateSpouts() {
+        LOG.info("Activating spout {}:{}", componentId, taskIds);
+        for (ISpout spout : spouts) {
+            spout.activate();
+        }
+    }
+
+    private void deactivateSpouts() {
+        LOG.info("Deactivating spout {}:{}", componentId, taskIds);
+        for (ISpout spout : spouts) {
+            spout.deactivate();
+        }
+    }
+
+    private void inactiveExecute() throws InterruptedException {
+        if (lastActive.get()) {
+            lastActive.set(false);
+            deactivateSpouts();
+        }
+        long start = Time.currentTimeMillis();
+        Time.sleep(100);
+        spoutThrottlingMetrics.skippedInactiveMs(Time.currentTimeMillis() - start);
+    }
+
     @Override
     public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
         String streamId = tuple.getSourceStreamId();
-        if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
+        if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
+            spoutOutputCollector.flush();
+        } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
             pending.rotate();
         } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
-            metricsTick(idToTask.get(taskId), tuple);
+            metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
         } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
-            Object spoutObj = idToTask.get(taskId).getTaskObject();
+            Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject();
             if (spoutObj instanceof ICredentialsListener) {
                 ((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
             }
@@ -208,26 +311,28 @@ public class SpoutExecutor extends Executor {
         } else {
             Long id = (Long) tuple.getValue(0);
             Long timeDeltaMs = (Long) tuple.getValue(1);
-            TupleInfo tupleInfo = (TupleInfo) pending.remove(id);
+            TupleInfo tupleInfo = pending.remove(id);
             if (tupleInfo != null && tupleInfo.getMessageId() != null) {
                 if (taskId != tupleInfo.getTaskId()) {
                     throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
                 }
-                long startTimeMs = tupleInfo.getTimestamp();
                 Long timeDelta = null;
-                if (startTimeMs != 0) {
-                    timeDelta = timeDeltaMs;
+                if (hasAckers) {
+                    long startTimeMs = tupleInfo.getTimestamp();
+                    if (startTimeMs != 0) {
+                        timeDelta = timeDeltaMs;
+                    }
                 }
                 if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
-                    ackSpoutMsg(this, idToTask.get(taskId), timeDelta, tupleInfo);
+                    ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo);
                 } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
-                    failSpoutMsg(this, idToTask.get(taskId), timeDelta, tupleInfo, "FAIL-STREAM");
+                    failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM");
                 }
             }
         }
     }
 
-    public void ackSpoutMsg(Executor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
+    public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
         try {
             ISpout spout = (ISpout) taskData.getTaskObject();
             int taskId = taskData.getTaskId();
@@ -235,16 +340,18 @@ public class SpoutExecutor extends Executor {
                 LOG.info("SPOUT Acking message {} {}", tupleInfo.getId(), tupleInfo.getMessageId());
             }
             spout.ack(tupleInfo.getMessageId());
-            new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
-            if (timeDelta != null) {
-                ((SpoutExecutorStats) executor.getStats()).spoutAckedTuple(tupleInfo.getStream(), timeDelta);
+            if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary
+                new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
+            }
+            if (hasAckers && timeDelta!=null) {
+                executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta);
             }
         } catch (Exception e) {
             throw Utils.wrapInRuntime(e);
         }
     }
 
-    public void failSpoutMsg(Executor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) {
+    public void failSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) {
         try {
             ISpout spout = (ISpout) taskData.getTaskObject();
             int taskId = taskData.getTaskId();
@@ -254,10 +361,21 @@ public class SpoutExecutor extends Executor {
             spout.fail(tupleInfo.getMessageId());
             new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
             if (timeDelta != null) {
-                ((SpoutExecutorStats) executor.getStats()).spoutFailedTuple(tupleInfo.getStream(), timeDelta);
+                executor.getStats().spoutFailedTuple(tupleInfo.getStream(), timeDelta);
             }
         } catch (Exception e) {
             throw Utils.wrapInRuntime(e);
         }
     }
+
+
+    public int getSpoutRecvqCheckSkipCount() {
+        if(ackingEnabled)
+            return 0; // always check recQ if ACKing enabled
+        return ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS), 0);
+    }
+
+    public long getThreadId() {
+        return threadId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
index 1d78d57..11acbc2 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
@@ -22,19 +22,25 @@ import org.apache.storm.daemon.Task;
 import org.apache.storm.executor.TupleInfo;
 import org.apache.storm.spout.ISpout;
 import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.MessageId;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.MutableLong;
 import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+/**
+ *   Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally
+ */
 public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
-
+    private static final Logger LOG = LoggerFactory.getLogger(SpoutOutputCollectorImpl.class);
     private final SpoutExecutor executor;
     private final Task taskData;
     private final int taskId;
@@ -44,33 +50,57 @@ public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
     private final Boolean isEventLoggers;
     private final Boolean isDebug;
     private final RotatingMap<Long, TupleInfo> pending;
+    private TupleInfo globalTupleInfo = new TupleInfo();  // thread safety: assumes Collector.emit*() calls are externally synchronized (if needed).
+    private final long spoutExecutorThdId;
 
     @SuppressWarnings("unused")
-    public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, Task taskData, int taskId,
+    public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, Task taskData,
                                     MutableLong emittedCount, boolean hasAckers, Random random,
                                     Boolean isEventLoggers, Boolean isDebug, RotatingMap<Long, TupleInfo> pending) {
         this.executor = executor;
         this.taskData = taskData;
-        this.taskId = taskId;
+        this.taskId = taskData.getTaskId();
         this.emittedCount = emittedCount;
         this.hasAckers = hasAckers;
         this.random = random;
         this.isEventLoggers = isEventLoggers;
         this.isDebug = isDebug;
         this.pending = pending;
+        this.spoutExecutorThdId = executor.getThreadId();
     }
 
     @Override
     public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
-        return sendSpoutMsg(streamId, tuple, messageId, null);
+        try {
+            return sendSpoutMsg(streamId, tuple, messageId, null);
+        } catch (InterruptedException e) {
+            LOG.warn("Spout thread interrupted during emit().");
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
-        sendSpoutMsg(streamId, tuple, messageId, taskId);
+        try {
+            sendSpoutMsg(streamId, tuple, messageId, taskId);
+        } catch (InterruptedException e) {
+            LOG.warn("Spout thread interrupted during emitDirect().");
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
+    public void flush() {
+        try {
+            executor.getExecutorTransfer().flush();
+        } catch (InterruptedException e) {
+            LOG.warn("Spout thread interrupted during flush().");
+            throw new RuntimeException(e);
+        }
+    }
+
+
+    @Override
     public long getPendingCount() {
         return pending.size();
     }
@@ -81,7 +111,7 @@ public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
         executor.getReportError().report(error);
     }
 
-    private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) {
+    private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) throws InterruptedException {
         emittedCount.increment();
 
         List<Integer> outTasks;
@@ -91,11 +121,14 @@ public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
             outTasks = taskData.getOutgoingTasks(stream, values);
         }
 
-        List<Long> ackSeq = new ArrayList<>();
-        boolean needAck = (messageId != null) && hasAckers;
+        final boolean needAck = (messageId != null) && hasAckers;
+
+        final List<Long> ackSeq = needAck ? new ArrayList<>() : null;
 
-        long rootId = MessageId.generateId(random);
-        for (Integer t : outTasks) {
+        final long rootId = needAck ? MessageId.generateId(random) : 0;
+
+        for (int i = 0; i < outTasks.size(); i++) { // perf critical path. don't use iterators.
+            Integer t = outTasks.get(i);
             MessageId msgId;
             if (needAck) {
                 long as = MessageId.generateId(random);
@@ -105,19 +138,16 @@ public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
                 msgId = MessageId.makeUnanchored();
             }
 
-            TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, msgId);
-            executor.getExecutorTransfer().transfer(t, tuple);
+            final TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId);
+            AddressedTuple adrTuple = new AddressedTuple(t, tuple);
+            executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits());
         }
         if (isEventLoggers) {
-            executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), messageId, random);
+            taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits());
         }
 
-        boolean sample = false;
-        try {
-            sample = executor.getSampler().call();
-        } catch (Exception ignored) {
-        }
         if (needAck) {
+            boolean sample = executor.samplerCheck();
             TupleInfo info = new TupleInfo();
             info.setTaskId(this.taskId);
             info.setStream(stream);
@@ -131,18 +161,24 @@ public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
 
             pending.put(rootId, info);
             List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
-            executor.sendUnanchored(taskData, Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer());
+            taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits());
         } else if (messageId != null) {
-            TupleInfo info = new TupleInfo();
-            info.setStream(stream);
-            info.setValues(values);
-            info.setMessageId(messageId);
-            info.setTimestamp(0);
-            Long timeDelta = sample ? 0L : null;
-            info.setId("0:");
-            executor.ackSpoutMsg(executor, taskData, timeDelta, info);
+            // Reusing TupleInfo object as we directly call executor.ackSpoutMsg() & are not sending msgs. perf critical
+            if (isDebug) {
+                if (spoutExecutorThdId != Thread.currentThread().getId()) {
+                    throw new RuntimeException("Detected background thread emitting tuples for the spout. " +
+                        "Spout Output Collector should only emit from the main spout executor thread.");
+                }
+            }
+            globalTupleInfo.clear();
+            globalTupleInfo.setStream(stream);
+            globalTupleInfo.setValues(values);
+            globalTupleInfo.setMessageId(messageId);
+            globalTupleInfo.setTimestamp(0);
+            globalTupleInfo.setId("0:");
+            Long timeDelta = 0L;
+            executor.ackSpoutMsg(executor, taskData, timeDelta, globalTupleInfo);
         }
-
         return outTasks;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
index ad8014c..f2c792a 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
@@ -28,20 +28,19 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.atomic.AtomicInteger;
 
+
 public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
-    private Random random;
     private ArrayList<List<Integer>> choices;
     private AtomicInteger current;
 
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
-        random = new Random();
         choices = new ArrayList<List<Integer>>(targetTasks.size());
         for (Integer i: targetTasks) {
             choices.add(Arrays.asList(i));
         }
-        Collections.shuffle(choices, random);
         current = new AtomicInteger(0);
+        Collections.shuffle(choices, new Random());
     }
 
     @Override
@@ -56,8 +55,6 @@ public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
                 current.set(0);
                 return choices.get(0);
             }
-            //race condition with another thread, and we lost
-            // try again
-        }
+        } // race condition with another thread, and we lost. try again
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java b/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
index 73a7f33..3b8cb6d 100644
--- a/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
+++ b/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
@@ -33,7 +33,8 @@ public class BoltExecuteInfo {
     }
 
     public void applyOn(TopologyContext topologyContext) {
-        for (ITaskHook hook : topologyContext.getHooks()) {
+        for (int i = 0; i < topologyContext.getHooks().size(); i++) { // perf critical loop. dont use iterators
+            ITaskHook hook = topologyContext.getHooks().get(i);
             hook.boltExecute(this);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
index 5d097d7..8b33465 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
@@ -18,52 +18,67 @@
 package org.apache.storm.messaging;
 
 import org.apache.storm.grouping.Load;
+import org.apache.storm.messaging.netty.BackPressureStatus;
+
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.function.Supplier;
 
 public interface IConnection {
+
     /**
      * Register a callback to be notified when data is ready to be processed.
      * @param cb the callback to process the messages.
      */
-    public void registerRecv(IConnectionCallback cb);
+    void registerRecv(IConnectionCallback cb);
+
+    /**
+     * Register a response generator to be used to send an initial response when a new client connects.
+     * @param cb the callback to process the connection.
+     */
+    void registerNewConnectionResponse(Supplier<Object> cb);
 
     /**
      * Send load metrics to all downstream connections.
      * @param taskToLoad a map from the task id to the load for that task.
      */
-    public void sendLoadMetrics(Map<Integer, Double> taskToLoad);
-    
+    void sendLoadMetrics(Map<Integer, Double> taskToLoad);
+
+    /**
+     * Sends the back pressure metrics to all downstream connections.
+     */
+    void sendBackPressureStatus(BackPressureStatus bpStatus);
+
     /**
      * send a message with taskId and payload
      * @param taskId task ID
      * @param payload
      */
-    public void send(int taskId,  byte[] payload);
+    void send(int taskId,  byte[] payload);
     
     /**
      * send batch messages
      * @param msgs
      */
 
-    public void send(Iterator<TaskMessage> msgs);
+    void send(Iterator<TaskMessage> msgs);
     
     /**
      * Get the current load for the given tasks
      * @param tasks the tasks to look for.
      * @return a Load for each of the tasks it knows about.
      */
-    public Map<Integer, Load> getLoad(Collection<Integer> tasks);
+    Map<Integer, Load> getLoad(Collection<Integer> tasks);
     
     /**
      * Get the port for this connection
      * @return The port this connection is using
      */
-    public int getPort();
+    int getPort();
 
     /**
      * close this connection
      */
-    public void close();
+    void close();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
index c5c2261..00b7546 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -18,6 +18,7 @@
 package org.apache.storm.messaging;
 
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This interface needs to be implemented for messaging plugin. 
@@ -33,12 +34,12 @@ public interface IContext {
      * This method is invoked at the startup of messaging plugin
      * @param topoConf storm configuration
      */
-    public void prepare(Map<String, Object> topoConf);
+    void prepare(Map<String, Object> topoConf);
     
     /**
      * This method is invoked when a worker is unloading a messaging plugin
      */
-    public void term();
+    void term();
 
     /**
      * This method establishes a server side connection 
@@ -46,14 +47,15 @@ public interface IContext {
      * @param port port #
      * @return server side connection
      */
-    public IConnection bind(String storm_id, int port);
+    IConnection bind(String storm_id, int port);
     
     /**
      * This method establish a client side connection to a remote server
      * @param storm_id topology ID
      * @param host remote host
      * @param port remote port
+     * @param remoteBpStatus array of booleans reflecting Back Pressure status of remote tasks.
      * @return client side connection
      */
-    public IConnection connect(String storm_id, String host, int port);
+    IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
index 23e934a..0babd5f 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.messaging.local;
 
+import org.apache.storm.messaging.netty.BackPressureStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +34,9 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
 import org.apache.storm.grouping.Load;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.TaskMessage;
@@ -57,6 +61,11 @@ public class Context implements IContext {
         }
 
         @Override
+        public void registerNewConnectionResponse(Supplier<Object> cb) {
+            return;
+        }
+
+        @Override
         public void send(int taskId,  byte[] payload) {
             throw new IllegalArgumentException("SHOULD NOT HAPPEN");
         }
@@ -84,6 +93,11 @@ public class Context implements IContext {
         }
 
         @Override
+        public void sendBackPressureStatus(BackPressureStatus bpStatus) {
+            throw new RuntimeException("Local Server connection should not send BackPressure status");
+        }
+
+        @Override
         public int getPort() {
             return port;
         }
@@ -130,7 +144,12 @@ public class Context implements IContext {
         public void registerRecv(IConnectionCallback cb) {
             throw new IllegalArgumentException("SHOULD NOT HAPPEN");
         }
-        
+
+        @Override
+        public void registerNewConnectionResponse(Supplier<Object> cb) {
+            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
+        }
+
         private void flushPending(){
             IConnectionCallback serverCb = _server._cb;
             if (serverCb != null && !_pendingDueToUnregisteredServer.isEmpty()) {
@@ -180,6 +199,11 @@ public class Context implements IContext {
         }
 
         @Override
+        public void sendBackPressureStatus(BackPressureStatus bpStatus) {
+            throw new RuntimeException("Local Client connection should not send BackPressure status");
+        }
+
+        @Override
         public int getPort() {
             return _server.getPort();
         }
@@ -221,7 +245,7 @@ public class Context implements IContext {
     }
 
     @Override
-    public IConnection connect(String storm_id, String host, int port) {
+    public IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus) {
         return new LocalClient(getLocalServer(storm_id, port));
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java
new file mode 100644
index 0000000..0996ff0
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus {
+    public static final short IDENTIFIER = (short)-600;
+    private static final int SIZE_OF_ID = 2; // size if IDENTIFIER
+    private static final int SIZE_OF_INT = 4;
+
+    private static AtomicLong bpCount = new AtomicLong(0);
+
+    public String workerId;
+    public final long id;                  // monotonically increasing id for correlating sent/recvd msgs. ok if restarts from 0 on crash.
+    public Collection<Integer> bpTasks;    // task Ids experiencing BP. can be null
+    public Collection<Integer> nonBpTasks; // task Ids no longer experiencing BP. can be null
+
+    public BackPressureStatus() {
+        this.id = bpCount.incrementAndGet();
+    }
+    public BackPressureStatus(String workerId, Collection<Integer> bpTasks, Collection<Integer> nonBpTasks) {
+        this.workerId = workerId;
+        this.id = bpCount.incrementAndGet();
+        this.bpTasks = bpTasks;
+        this.nonBpTasks = nonBpTasks;
+    }
+
+    @Override
+    public String toString() {
+        return "{worker=" + workerId + ", bpStatusId=" + id + ", bpTasks=" + bpTasks + ", nonBpTasks=" + nonBpTasks + '}';
+    }
+
+    /** Encoded as
+     *  -600 ... short(2)
+     *  len ... int(4)
+     *  payload ... byte[]     *
+     */
+    public ChannelBuffer buffer(KryoValuesSerializer ser) throws IOException {
+        byte[] serializedBytes = ser.serializeObject(this);
+        ChannelBuffer buff = ChannelBuffers.buffer(SIZE_OF_ID + SIZE_OF_INT + serializedBytes.length);
+        buff.writeShort(IDENTIFIER);
+        buff.writeInt(serializedBytes.length);
+        buff.writeBytes(serializedBytes);
+        return buff;
+    }
+
+    public static BackPressureStatus read(byte[] bytes, KryoValuesDeserializer deserializer) {
+        return (BackPressureStatus) deserializer.deserializeObject(bytes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index fcaf4e5..ac92c6b 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -17,25 +17,19 @@
  */
 package org.apache.storm.messaging.netty;
 
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Iterator;
-import java.util.Collection;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Timer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.lang.InterruptedException;
-
 import org.apache.storm.Config;
 import org.apache.storm.grouping.Load;
 import org.apache.storm.messaging.ConnectionWithStatus;
-import org.apache.storm.messaging.TaskMessage;
 import org.apache.storm.messaging.IConnectionCallback;
+import org.apache.storm.messaging.TaskMessage;
 import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION;
+import org.apache.storm.policy.WaitStrategyProgressive;
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ReflectionUtils;
 import org.apache.storm.utils.StormBoundedExponentialBackoffRetry;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
@@ -48,11 +42,22 @@ import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Timer;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import static com.google.common.base.Preconditions.checkState;
 
@@ -77,6 +82,9 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     private static final long NO_DELAY_MS = 0L;
     private static final Timer timer = new Timer("Netty-ChannelAlive-Timer", true);
 
+    KryoValuesSerializer ser;
+    KryoValuesDeserializer deser;
+
     private final Map<String, Object> topoConf;
     private final StormBoundedExponentialBackoffRetry retryPolicy;
     private final ClientBootstrap bootstrap;
@@ -140,18 +148,22 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
 
     private final MessageBuffer batcher;
 
-    private final Object writeLock = new Object();
+    // wait strategy when the netty channel is not writable
+    private final IWaitStrategy waitStrategy;
 
     @SuppressWarnings("rawtypes")
-    Client(Map<String, Object> topoConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port, Context context) {
+    Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port, Context context) {
         this.topoConf = topoConf;
         closing = false;
         this.scheduler = scheduler;
         this.context = context;
         int bufferSize = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+        int lowWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK));
+        int highWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK));
         // if SASL authentication is disabled, saslChannelReady is initialized as true; otherwise false
         saslChannelReady.set(!ObjectReader.getBoolean(topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false));
-        LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port, bufferSize);
+        LOG.info("Creating Netty Client, connecting to {}:{}, bufferSize: {}, lowWatermark: {}, highWatermark: {}",
+            host, port, bufferSize, lowWatermark, highWatermark);
         int messageBatchSize = ObjectReader.getInt(topoConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
 
         int maxReconnectionAttempts = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
@@ -160,13 +172,22 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts);
 
         // Initiate connection to remote destination
-        bootstrap = createClientBootstrap(factory, bufferSize, topoConf);
+        bootstrap = createClientBootstrap(factory, bufferSize, lowWatermark, highWatermark, topoConf, remoteBpStatus);
         dstHost = host;
         dstAddress = new InetSocketAddress(host, port);
         dstAddressPrefixedName = prefixedName(dstAddress);
         launchChannelAliveThread();
         scheduleConnect(NO_DELAY_MS);
         batcher = new MessageBuffer(messageBatchSize);
+        String clazz = (String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY);
+        if (clazz == null) {
+            waitStrategy = new WaitStrategyProgressive();
+        } else {
+            waitStrategy = ReflectionUtils.newInstance(clazz);
+        }
+        waitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
+        ser = new KryoValuesSerializer(topoConf);
+        deser = new KryoValuesDeserializer(topoConf);
     }
 
     /**
@@ -194,12 +215,17 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         }, 0, CHANNEL_ALIVE_INTERVAL_MS);
     }
 
-    private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize, Map<String, Object> topoConf) {
+    private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize,
+                                                  int lowWatermark, int highWatermark,
+                                                  Map<String, Object> topoConf,
+                                                  AtomicBoolean[] remoteBpStatus) {
         ClientBootstrap bootstrap = new ClientBootstrap(factory);
         bootstrap.setOption("tcpNoDelay", true);
         bootstrap.setOption("sendBufferSize", bufferSize);
         bootstrap.setOption("keepAlive", true);
-        bootstrap.setPipelineFactory(new StormClientPipelineFactory(this, topoConf));
+        bootstrap.setOption("writeBufferLowWaterMark", lowWatermark);
+        bootstrap.setOption("writeBufferHighWaterMark", highWatermark);
+        bootstrap.setPipelineFactory(new StormClientPipelineFactory(this, remoteBpStatus, topoConf));
         return bootstrap;
     }
 
@@ -262,11 +288,21 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     }
 
     @Override
+    public void registerNewConnectionResponse(Supplier<Object> cb) {
+        throw new UnsupportedOperationException("Client does not accept new connections");
+    }
+
+    @Override
     public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
         throw new RuntimeException("Client connection should not send load metrics");
     }
 
     @Override
+    public void sendBackPressureStatus(BackPressureStatus bpStatus) {
+        throw new RuntimeException("Client connection should not send BackPressure status");
+    }
+
+    @Override
     public void send(int taskId, byte[] payload) {
         TaskMessage msg = new TaskMessage(taskId, payload);
         List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
@@ -281,7 +317,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     public void send(Iterator<TaskMessage> msgs) {
         if (closing) {
             int numMessages = iteratorSize(msgs);
-            LOG.error("discarding {} messages because the Netty client to {} is being closed", numMessages,
+            LOG.error("Dropping {} messages because the Netty client to {} is being closed", numMessages,
                     dstAddressPrefixedName);
             return;
         }
@@ -302,31 +338,39 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
             dropMessages(msgs);
             return;
         }
-
-        synchronized (writeLock) {
+        try {
             while (msgs.hasNext()) {
                 TaskMessage message = msgs.next();
-                MessageBatch full = batcher.add(message);
-                if(full != null){
-                    flushMessages(channel, full);
+                MessageBatch batch = batcher.add(message);
+                if (batch != null) {
+                    writeMessage(channel, batch);
                 }
             }
+            MessageBatch batch = batcher.drain();
+            if (batch != null) {
+                writeMessage(channel, batch);
+            }
+        } catch (IOException e) {
+            LOG.warn("Exception when sending message to remote worker.", e);
+            dropMessages(msgs);
         }
+    }
 
-        if(channel.isWritable()){
-            synchronized (writeLock) {
-                // Netty's internal buffer is not full and we still have message left in the buffer.
-                // We should write the unfilled MessageBatch immediately to reduce latency
-                MessageBatch batch = batcher.drain();
-                if(batch != null) {
-                    flushMessages(channel, batch);
+    private void writeMessage(Channel channel, MessageBatch batch) throws IOException {
+        try {
+            int idleCounter = 0;
+            while (!channel.isWritable()) {
+                if (idleCounter == 0) { // check avoids multiple log msgs when in a idle loop
+                    LOG.debug("Experiencing Back Pressure from Netty. Entering BackPressure Wait");
+                }
+                if (!channel.isConnected()) {
+                    throw new IOException("Connection disconnected");
                 }
+                idleCounter = waitStrategy.idle(idleCounter);
             }
-        } else {
-            // Channel's buffer is full, meaning that we have time to wait other messages to arrive, and create a bigger
-            // batch. This yields better throughput.
-            // We can rely on `notifyInterestChanged` to push these messages as soon as there is spece in Netty's buffer
-            // because we know `Channel.isWritable` was false after the messages were already in the buffer.
+            flushMessages(channel, batch);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
         }
     }
 
@@ -357,6 +401,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         // We consume the iterator by traversing and thus "emptying" it.
         int msgCount = iteratorSize(msgs);
         messagesLost.getAndAdd(msgCount);
+        LOG.info("Dropping {} messages", msgCount);
     }
 
     private int iteratorSize(Iterator<TaskMessage> msgs) {
@@ -548,13 +593,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
      * @param channel
      */
     public void notifyInterestChanged(Channel channel) {
-        if(channel.isWritable()){
-            synchronized (writeLock) {
-                // Channel is writable again, write if there are any messages pending
-                MessageBatch pending = batcher.drain();
-                flushMessages(channel, pending);
-            }
-        }
+        // NOOP since we are checking channel.isWritable in writeMessage
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
index 486cd03..5a169de 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.storm.Config;
 import org.apache.storm.messaging.IConnection;
@@ -34,7 +35,6 @@ public class Context implements IContext {
     private Map<String, Object> topoConf;
     private Map<String, IConnection> connections;
     private NioClientSocketChannelFactory clientChannelFactory;
-    
     private HashedWheelTimer clientScheduleService;
 
     /**
@@ -72,13 +72,13 @@ public class Context implements IContext {
     /**
      * establish a connection to a remote server
      */
-    public synchronized IConnection connect(String storm_id, String host, int port) {
+    public synchronized IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus) {
         IConnection connection = connections.get(key(host,port));
         if(connection !=null)
         {
             return connection;
         }
-        IConnection client =  new Client(topoConf, clientChannelFactory, 
+        IConnection client =  new Client(topoConf,remoteBpStatus, clientChannelFactory,
                 clientScheduleService, host, port, this);
         connections.put(key(host, client.getPort()), client);
         return client;

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
index 9030424..87c16af 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
@@ -21,20 +21,28 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.serialization.KryoValuesDeserializer;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.handler.codec.frame.FrameDecoder;
 
-public class MessageDecoder extends FrameDecoder {    
+public class MessageDecoder extends FrameDecoder {
+
+    private KryoValuesDeserializer deser;
+
+    public MessageDecoder(KryoValuesDeserializer deser) {
+        this.deser = deser;
+    }
+
     /*
-     * Each ControlMessage is encoded as:
-     *  code (<0) ... short(2)
-     * Each TaskMessage is encoded as:
-     *  task (>=0) ... short(2)
-     *  len ... int(4)
-     *  payload ... byte[]     *  
-     */
+         * Each ControlMessage is encoded as:
+         *  code (<0) ... short(2)
+         * Each TaskMessage is encoded as:
+         *  task (>=0) ... short(2)
+         *  len ... int(4)
+         *  payload ... byte[]     *
+         */
     protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
         // Make sure that we have received at least a short 
         long available = buf.readableBytes();
@@ -99,7 +107,24 @@ public class MessageDecoder extends FrameDecoder {
                 return new SaslMessageToken(payload.array());
             }
 
-            // case 3: task Message
+            // case 3: BackPressureStatus
+            if (code == BackPressureStatus.IDENTIFIER) {
+                available = buf.readableBytes();
+                if(available < 4)
+                    return null;
+                int dataLen = buf.readInt();
+                if (available < 4 + dataLen) {
+                    // need more data
+                    buf.resetReaderIndex();
+                    return null;
+                }
+                byte[] bytes = new byte[dataLen];
+                buf.readBytes(bytes);
+                return BackPressureStatus.read(bytes, deser);
+
+            }
+
+            // case 4: task Message
 
             // Make sure that we have received at least an integer (length)
             if (available < 4) {

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
index 0e9fc98..b714c48 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
@@ -17,11 +17,19 @@
  */
 package org.apache.storm.messaging.netty;
 
+import org.apache.storm.serialization.KryoValuesSerializer;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
 
-public class MessageEncoder extends OneToOneEncoder {    
+public class MessageEncoder extends OneToOneEncoder {
+
+    private KryoValuesSerializer ser;
+
+    public MessageEncoder(KryoValuesSerializer ser) {
+        this.ser = ser;
+    }
+
     @Override
     protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
         if (obj instanceof ControlMessage) {
@@ -30,8 +38,12 @@ public class MessageEncoder extends OneToOneEncoder {
 
         if (obj instanceof MessageBatch) {
             return ((MessageBatch)obj).buffer();
-        } 
-        
+        }
+
+        if (obj instanceof BackPressureStatus) {
+            return ((BackPressureStatus)obj).buffer(ser);
+        }
+
         if (obj instanceof SaslMessageToken) {
         	return ((SaslMessageToken)obj).buffer();
         }