You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/02/14 14:07:41 UTC

[6/9] storm git commit: STORM-2306 - Messaging subsystem redesign. New Backpressure model.

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormTimer.java b/storm-client/src/jvm/org/apache/storm/StormTimer.java
index b2e2b4a..0f54ce1 100644
--- a/storm-client/src/jvm/org/apache/storm/StormTimer.java
+++ b/storm-client/src/jvm/org/apache/storm/StormTimer.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,8 +18,8 @@
 
 package org.apache.storm;
 
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
 
 import java.nio.channels.ClosedByInterruptException;
 import java.util.Comparator;
@@ -97,6 +97,9 @@ public class StormTimer implements AutoCloseable {
                         // events.
                         Time.sleep(1000);
                     }
+                    if (Thread.interrupted()) {
+                        this.active.set(false);
+                    }
                 } catch (Throwable e) {
                     if (!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e))
                             && !(Utils.exceptionCauseIsInstanceOf(ClosedByInterruptException.class, e))) {
@@ -158,6 +161,17 @@ public class StormTimer implements AutoCloseable {
      * @param jitterMs add jitter to the run
      */
     public void schedule(int delaySecs, Runnable func, boolean checkActive, int jitterMs) {
+        scheduleMs(Time.secsToMillisLong(delaySecs), func, checkActive, jitterMs);
+    }
+
+    /**
+     * Same as schedule with millisecond resolution
+     * @param delayMs the number of milliseconds to delay before running the function
+     * @param func the function to run
+     * @param checkActive whether to check is the timer is active
+     * @param jitterMs add jitter to the run
+     */
+    public void scheduleMs(long delayMs, Runnable func, boolean checkActive, int jitterMs) {
         if (func == null) {
             throw new RuntimeException("function to schedule is null!");
         }
@@ -165,7 +179,7 @@ public class StormTimer implements AutoCloseable {
             checkActive();
         }
         String id = Utils.uuid();
-        long endTimeMs = Time.currentTimeMillis() + Time.secsToMillisLong(delaySecs);
+        long endTimeMs = Time.currentTimeMillis() + delayMs;
         if (jitterMs > 0) {
             endTimeMs = this.task.random.nextInt(jitterMs) + endTimeMs;
         }
@@ -176,6 +190,10 @@ public class StormTimer implements AutoCloseable {
         schedule(delaySecs, func, true, 0);
     }
 
+    public void scheduleMs(long delayMs, Runnable func) {
+        scheduleMs(delayMs, func, true, 0);
+    }
+
     /**
      * Schedule a function to run recurrently
      * @param delaySecs the number of seconds to delay before running the function
@@ -194,6 +212,24 @@ public class StormTimer implements AutoCloseable {
     }
 
     /**
+     * Schedule a function to run recurrently
+     * @param delayMs the number of millis to delay before running the function
+     * @param recurMs the time between each invocation
+     * @param func the function to run
+     */
+    public void scheduleRecurringMs(long delayMs, final long recurMs, final Runnable func) {
+        scheduleMs(delayMs, new Runnable() {
+            @Override
+            public void run() {
+                func.run();
+                // This avoids a race condition with cancel-timer.
+                scheduleMs(recurMs, this, true, 0);
+            }
+        });
+    }
+
+
+    /**
      * schedule a function to run recurrently with jitter
      * @param delaySecs the number of seconds to delay before running the function
      * @param recurSecs the time between each invocation

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
index c61c104..362d4dd 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index adc434c..ab893be 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -40,21 +40,21 @@ import org.apache.storm.generated.WorkerTokenServiceType;
 import org.apache.storm.nimbus.NimbusInfo;
 
 public interface IStormClusterState {
-    public List<String> assignments(Runnable callback);
+    List<String> assignments(Runnable callback);
 
-    public Assignment assignmentInfo(String stormId, Runnable callback);
+    Assignment assignmentInfo(String stormId, Runnable callback);
 
-    public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
+    VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
 
-    public Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
+    Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
 
-    public List<String> blobstoreInfo(String blobKey);
+    List<String> blobstoreInfo(String blobKey);
 
-    public List<NimbusSummary> nimbuses();
+    List<NimbusSummary> nimbuses();
 
-    public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
+    void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
 
-    public List<String> activeStorms();
+    List<String> activeStorms();
 
     /**
      * Get a storm base for a topology
@@ -62,87 +62,95 @@ public interface IStormClusterState {
      * @param callback something to call if the data changes (best effort)
      * @return the StormBase or null if it is not alive.
      */
-    public StormBase stormBase(String stormId, Runnable callback);
+    StormBase stormBase(String stormId, Runnable callback);
 
-    public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
+    ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
 
-    public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
+    List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
 
-    public List<ProfileRequest> getTopologyProfileRequests(String stormId);
+    List<ProfileRequest> getTopologyProfileRequests(String stormId);
 
-    public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
+    void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
 
-    public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
+    void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
 
-    public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
+    Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
 
-    public List<String> supervisors(Runnable callback);
+    List<String> supervisors(Runnable callback);
 
-    public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
+    SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
 
-    public void setupHeatbeats(String stormId);
+    void setupHeatbeats(String stormId);
 
-    public void teardownHeartbeats(String stormId);
+    void teardownHeartbeats(String stormId);
 
-    public void teardownTopologyErrors(String stormId);
+    void teardownTopologyErrors(String stormId);
 
-    public List<String> heartbeatStorms();
+    List<String> heartbeatStorms();
 
-    public List<String> errorTopologies();
+    List<String> errorTopologies();
 
-    public List<String> backpressureTopologies();
+    /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */
+    @Deprecated
+    List<String> backpressureTopologies();
 
-    public void setTopologyLogConfig(String stormId, LogConfig logConfig);
+    void setTopologyLogConfig(String stormId, LogConfig logConfig);
 
-    public LogConfig topologyLogConfig(String stormId, Runnable cb);
+    LogConfig topologyLogConfig(String stormId, Runnable cb);
 
-    public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info);
+    void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info);
 
-    public void removeWorkerHeartbeat(String stormId, String node, Long port);
+    void removeWorkerHeartbeat(String stormId, String node, Long port);
 
-    public void supervisorHeartbeat(String supervisorId, SupervisorInfo info);
+    void supervisorHeartbeat(String supervisorId, SupervisorInfo info);
 
-    public void workerBackpressure(String stormId, String node, Long port, long timestamp);
+    /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */
+    @Deprecated
+    boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback);
 
-    public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback);
+    /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */
+    @Deprecated
+    void setupBackpressure(String stormId);
 
-    public void setupBackpressure(String stormId);
+    /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */
+    @Deprecated
+    void removeBackpressure(String stormId);
 
-    public void removeBackpressure(String stormId);
+    /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */
+    @Deprecated
+    void removeWorkerBackpressure(String stormId, String node, Long port);
 
-    public void removeWorkerBackpressure(String stormId, String node, Long port);
+    void activateStorm(String stormId, StormBase stormBase);
 
-    public void activateStorm(String stormId, StormBase stormBase);
+    void updateStorm(String stormId, StormBase newElems);
 
-    public void updateStorm(String stormId, StormBase newElems);
+    void removeStormBase(String stormId);
 
-    public void removeStormBase(String stormId);
+    void setAssignment(String stormId, Assignment info);
 
-    public void setAssignment(String stormId, Assignment info);
+    void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo);
 
-    public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo);
+    List<String> activeKeys();
 
-    public List<String> activeKeys();
+    List<String> blobstore(Runnable callback);
 
-    public List<String> blobstore(Runnable callback);
+    void removeStorm(String stormId);
 
-    public void removeStorm(String stormId);
+    void removeBlobstoreKey(String blobKey);
 
-    public void removeBlobstoreKey(String blobKey);
+    void removeKeyVersion(String blobKey);
 
-    public void removeKeyVersion(String blobKey);
+    void reportError(String stormId, String componentId, String node, Long port, Throwable error);
 
-    public void reportError(String stormId, String componentId, String node, Long port, Throwable error);
+    List<ErrorInfo> errors(String stormId, String componentId);
 
-    public List<ErrorInfo> errors(String stormId, String componentId);
+    ErrorInfo lastError(String stormId, String componentId);
 
-    public ErrorInfo lastError(String stormId, String componentId);
+    void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf) throws NoSuchAlgorithmException;
 
-    public void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf) throws NoSuchAlgorithmException;
+    Credentials credentials(String stormId, Runnable callback);
 
-    public Credentials credentials(String stormId, Runnable callback);
-
-    public void disconnect();
+    void disconnect();
 
     /**
      * Get a private key used to validate a token is correct.

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 11757c0..719ebbf 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -71,6 +71,7 @@ public class StormClusterStateImpl implements IStormClusterState {
     private AtomicReference<Runnable> supervisorsCallback;
     // we want to register a topo directory getChildren callback for all workers of this dir
     private ConcurrentHashMap<String, Runnable> backPressureCallback;
+
     private AtomicReference<Runnable> assignmentsCallback;
     private ConcurrentHashMap<String, Runnable> stormBaseCallback;
     private AtomicReference<Runnable> blobstoreCallback;
@@ -398,6 +399,11 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     @Override
+    public List<String> backpressureTopologies() {
+        return stateStorage.get_children(ClusterUtils.BACKPRESSURE_SUBTREE, false);
+    }
+
+    @Override
     public List<String> heartbeatStorms() {
         return stateStorage.get_worker_hb_children(ClusterUtils.WORKERBEATS_SUBTREE, false);
     }
@@ -408,11 +414,6 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     @Override
-    public List<String> backpressureTopologies() {
-        return stateStorage.get_children(ClusterUtils.BACKPRESSURE_SUBTREE, false);
-    }
-
-    @Override
     public void setTopologyLogConfig(String stormId, LogConfig logConfig) {
         stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), defaultAcls);
     }
@@ -447,35 +448,6 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     /**
-     * If znode exists and timestamp is non-positive, delete;
-     * if exists and timestamp is larger than 0, update the timestamp;
-     * if not exists and timestamp is larger than 0, create the znode and set the timestamp;
-     * if not exists and timestamp is non-positive, do nothing.
-     * @param stormId The topology Id
-     * @param node The node id
-     * @param port The port number
-     * @param timestamp The backpressure timestamp. Non-positive means turning off the worker backpressure
-     */
-    @Override
-    public void workerBackpressure(String stormId, String node, Long port, long timestamp) {
-        String path = ClusterUtils.backpressurePath(stormId, node, port);
-        boolean existed = stateStorage.node_exists(path, false);
-        if (existed) {
-            if (timestamp <= 0) {
-                stateStorage.delete_node(path);
-            } else {
-                byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
-                stateStorage.set_data(path, data, defaultAcls);
-            }
-        } else {
-            if (timestamp > 0) {
-                byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
-                stateStorage.set_ephemeral_node(path, data, defaultAcls);
-            }
-        }
-    }
-
-    /**
      * Check whether a topology is in throttle-on status or not:
      * if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.
      * But if the backpresure/storm-id dir is not empty and has not been updated for more than timeoutMs, we treat it as throttle-off.
@@ -495,17 +467,16 @@ public class StormClusterStateImpl implements IStormClusterState {
         if(stateStorage.node_exists(path, false)) {
             List<String> children = stateStorage.get_children(path, callback != null);
             mostRecentTimestamp = children.stream()
-                    .map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
-                    .filter(data -> data != null)
-                    .mapToLong(data -> ByteBuffer.wrap(data).getLong())
-                    .max()
-                    .orElse(0);
+                .map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
+                .filter(data -> data != null)
+                .mapToLong(data -> ByteBuffer.wrap(data).getLong())
+                .max()
+                .orElse(0);
         }
         boolean ret = ((System.currentTimeMillis() - mostRecentTimestamp) < timeoutMs);
         LOG.debug("topology backpressure is {}", ret ? "on" : "off");
         return ret;
     }
-
     @Override
     public void setupBackpressure(String stormId) {
         stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), defaultAcls);
@@ -533,13 +504,18 @@ public class StormClusterStateImpl implements IStormClusterState {
             stateStorage.delete_node(path);
         }
     }
-
     @Override
     public void activateStorm(String stormId, StormBase stormBase) {
         String path = ClusterUtils.stormPath(stormId);
         stateStorage.set_data(path, Utils.serialize(stormBase), defaultAcls);
     }
 
+    /**
+     * To update this function due to APersistentMap/APersistentSet is clojure's structure
+     * 
+     * @param stormId
+     * @param newElems
+     */
     @Override
     public void updateStorm(String stormId, StormBase newElems) {
 
@@ -864,7 +840,7 @@ public class StormClusterStateImpl implements IStormClusterState {
         return ret;
     }
     
-    private List<String> tokenizePath(String path) {
+    private static List<String> tokenizePath(String path) {
         String[] toks = path.split("/");
         java.util.ArrayList<String> rtn = new ArrayList<>();
         for (String str : toks) {

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java b/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java
index e1581eb..3669c1b 100644
--- a/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java
+++ b/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java
@@ -40,7 +40,12 @@ public abstract class BatchOutputCollector {
         emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
     }
     
-    public abstract void emitDirect(int taskId, String streamId, List<Object> tuple); 
-    
+    public abstract void emitDirect(int taskId, String streamId, List<Object> tuple);
+
+    /**
+     * Flush any buffered tuples (when batching is enabled)
+     */
+    public abstract void flush();
+
     public abstract void reportError(Throwable error);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java
index 246ae5d..8814f77 100644
--- a/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java
@@ -39,6 +39,11 @@ public class BatchOutputCollectorImpl extends BatchOutputCollector {
     }
 
     @Override
+    public void flush() {
+        _collector.flush();
+    }
+
+    @Override
     public void reportError(Throwable error) {
         _collector.reportError(error);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java b/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
index b0706c2..b32b05e 100644
--- a/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -124,10 +124,14 @@ public class CoordinatedBolt implements IRichBolt {
             _delegate.fail(tuple);
         }
 
+        public void flush() {
+            _delegate.flush();
+        }
+
         public void resetTimeout(Tuple tuple) {
             _delegate.resetTimeout(tuple);
         }
-        
+
         public void reportError(Throwable error) {
             _delegate.reportError(error);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Acker.java b/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
index 8675e39..58a14e2 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.daemon;
 
+import org.apache.storm.Constants;
 import org.apache.storm.task.IBolt;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -102,14 +103,16 @@ public class Acker implements IBolt {
             if (curr != null) {
                 pending.put(id, curr);
             } //else if it has not been added yet, there is no reason time it out later on
+        } else if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
+            collector.flush();
+            return;
         } else {
             LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
             return;
         }
 
         int task = curr.spoutTask;
-        if (curr != null && task >= 0
-            && (curr.val == 0 || curr.failed || resetTimeout)) {
+        if (task >= 0  && (curr.val == 0 || curr.failed || resetTimeout)) {
             Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime));
             if (curr.val == 0) {
                 pending.remove(id);

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
index 179e882..bd0ac9a 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -137,7 +137,7 @@ public class GrouperFactory {
     public static class FieldsGrouper implements CustomStreamGrouping {
 
         private Fields outFields;
-        private List<Integer> targetTasks;
+        private List<List<Integer> > targetTasks;
         private Fields groupFields;
         private int numTasks;
 
@@ -149,14 +149,17 @@ public class GrouperFactory {
 
         @Override
         public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
-            this.targetTasks = targetTasks;
+            this.targetTasks = new ArrayList<List<Integer>>();
+            for (Integer targetTask : targetTasks) {
+                this.targetTasks.add(Collections.singletonList(targetTask));
+            }
             this.numTasks = targetTasks.size();
         }
 
         @Override
         public List<Integer> chooseTasks(int taskId, List<Object> values) {
             int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);
-            return Collections.singletonList(targetTasks.get(targetTaskIndex));
+            return targetTasks.get(targetTaskIndex);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
index f6053b9..f2ebd5f 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -126,7 +126,7 @@ public class StormCommon {
         }
         return keys;
     }
-    
+
     private static void validateIds(StormTopology topology) throws InvalidTopologyException {
         List<String> componentIds = new ArrayList<>();
         componentIds.addAll(validateIds(topology.get_bolts()));
@@ -362,6 +362,9 @@ public class StormCommon {
     public static void addEventLogger(Map<String, Object> conf, StormTopology topology) {
         Integer numExecutors = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
                 ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
+        if (numExecutors==null || numExecutors==0) {
+            return;
+        }
         HashMap<String, Object> componentConf = new HashMap<>();
         componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
         componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
@@ -442,6 +445,7 @@ public class StormCommon {
     public static void addSystemComponents(Map<String, Object> conf, StormTopology topology) {
         Map<String, StreamInfo> outputStreams = new HashMap<>();
         outputStreams.put(Constants.SYSTEM_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("rate_secs")));
+        outputStreams.put(Constants.SYSTEM_FLUSH_STREAM_ID, Thrift.outputFields(Arrays.asList()));
         outputStreams.put(Constants.METRICS_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("interval")));
         outputStreams.put(Constants.CREDENTIALS_CHANGED_STREAM_ID, Thrift.outputFields(Arrays.asList("creds")));
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
index 658aadc..ce9d0e4 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,23 +15,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Random;
+import java.util.function.BooleanSupplier;
+
+
 import org.apache.storm.Config;
 import org.apache.storm.Thrift;
-import org.apache.storm.daemon.metrics.BuiltinMetrics;
-import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
 import org.apache.storm.daemon.worker.WorkerState;
 import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.ExecutorTransfer;
 import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.ComponentObject;
+import org.apache.storm.generated.DebugOptions;
 import org.apache.storm.generated.JavaObject;
 import org.apache.storm.generated.ShellComponent;
 import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StateSpoutSpec;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
-import org.apache.storm.grouping.LoadMapping;
 import org.apache.storm.hooks.ITaskHook;
 import org.apache.storm.hooks.info.EmitInfo;
 import org.apache.storm.spout.ShellSpout;
@@ -39,20 +51,15 @@ import org.apache.storm.stats.CommonStats;
 import org.apache.storm.task.ShellBolt;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
 public class Task {
 
     private static final Logger LOG = LoggerFactory.getLogger(Task.class);
@@ -66,21 +73,21 @@ public class Task {
     private String componentId;
     private Object taskObject; // Spout/Bolt object
     private Map<String, Object> topoConf;
-    private Callable<Boolean> emitSampler;
+    private BooleanSupplier emitSampler;
     private CommonStats executorStats;
     private Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper;
-    private BuiltinMetrics builtInMetrics;
+    private HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> streamToGroupers;
     private boolean debug;
 
     public Task(Executor executor, Integer taskId) throws IOException {
         this.taskId = taskId;
         this.executor = executor;
         this.workerData = executor.getWorkerData();
-        this.topoConf = executor.getStormConf();
+        this.topoConf = executor.getTopoConf();
         this.componentId = executor.getComponentId();
         this.streamComponentToGrouper = executor.getStreamToComponentToGrouper();
+        this.streamToGroupers = getGroupersPerStream(streamComponentToGrouper);
         this.executorStats = executor.getStats();
-        this.builtInMetrics = BuiltinMetricsUtil.mkData(executor.getType(), this.executorStats);
         this.workerTopologyContext = executor.getWorkerTopologyContext();
         this.emitSampler = ConfigUtils.mkStatsSampler(topoConf);
         this.systemTopologyContext = mkTopologyContext(workerData.getSystemTopology());
@@ -103,9 +110,12 @@ public class Task {
         if (grouping != null && grouping != GrouperFactory.DIRECT) {
             throw new IllegalArgumentException("Cannot emitDirect to a task expecting a regular grouping");
         }
-        new EmitInfo(values, stream, taskId, Collections.singletonList(outTaskId)).applyOn(userTopologyContext);
+        if (!userTopologyContext.getHooks().isEmpty()) {
+            new EmitInfo(values, stream, taskId, Collections.singletonList(outTaskId)).applyOn(userTopologyContext);
+        }
+
         try {
-            if (emitSampler.call()) {
+            if (emitSampler.getAsBoolean()) {
                 executorStats.emittedTuple(stream);
                 if (null != outTaskId) {
                     executorStats.transferredTuples(stream, 1);
@@ -120,28 +130,33 @@ public class Task {
         return new ArrayList<>(0);
     }
 
+
     public List<Integer> getOutgoingTasks(String stream, List<Object> values) {
         if (debug) {
             LOG.info("Emitting Tuple: taskId={} componentId={} stream={} values={}", taskId, componentId, stream, values);
         }
 
-        List<Integer> outTasks = new ArrayList<>();
-        if (!streamComponentToGrouper.containsKey(stream)) {
-            throw new IllegalArgumentException("Unknown stream ID: " + stream);
-        }
-        if (null != streamComponentToGrouper.get(stream)) {
-            // null value for __system
-            for (LoadAwareCustomStreamGrouping grouper : streamComponentToGrouper.get(stream).values()) {
+        ArrayList<Integer> outTasks = new ArrayList<>();
+
+        ArrayList<LoadAwareCustomStreamGrouping> groupers = streamToGroupers.get(stream);
+        if (null != groupers)  {
+            for (int i=0; i<groupers.size(); ++i) {
+                LoadAwareCustomStreamGrouping grouper = groupers.get(i);
                 if (grouper == GrouperFactory.DIRECT) {
                     throw new IllegalArgumentException("Cannot do regular emit to direct stream");
                 }
                 List<Integer> compTasks = grouper.chooseTasks(taskId, values);
                 outTasks.addAll(compTasks);
             }
+        } else {
+            throw new IllegalArgumentException("Unknown stream ID: " + stream);
+        }
+
+        if (!userTopologyContext.getHooks().isEmpty()) {
+            new EmitInfo(values, stream, taskId, outTasks).applyOn(userTopologyContext);
         }
-        new EmitInfo(values, stream, taskId, outTasks).applyOn(userTopologyContext);
         try {
-            if (emitSampler.call()) {
+            if (emitSampler.getAsBoolean()) {
                 executorStats.emittedTuple(stream);
                 executorStats.transferredTuples(stream, outTasks.size());
             }
@@ -152,7 +167,7 @@ public class Task {
     }
 
     public Tuple getTuple(String stream, List values) {
-        return new TupleImpl(systemTopologyContext, values, systemTopologyContext.getThisTaskId(), stream);
+        return new TupleImpl(systemTopologyContext, values, executor.getComponentId(), systemTopologyContext.getThisTaskId(), stream);
     }
 
     public Integer getTaskId() {
@@ -171,8 +186,33 @@ public class Task {
         return taskObject;
     }
 
-    public BuiltinMetrics getBuiltInMetrics() {
-        return builtInMetrics;
+
+    // Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument
+    public void sendUnanchored(String stream, List<Object> values, ExecutorTransfer transfer, Queue<AddressedTuple> pendingEmits) {
+        Tuple tuple = getTuple(stream, values);
+        List<Integer> tasks = getOutgoingTasks(stream, values);
+        for (Integer t : tasks) {
+            AddressedTuple addressedTuple = new AddressedTuple(t, tuple);
+            transfer.tryTransfer(addressedTuple, pendingEmits);
+        }
+    }
+
+    /**
+     * Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api).
+     */
+    public void sendToEventLogger(Executor executor, List values,
+                                  String componentId, Object messageId, Random random, Queue<AddressedTuple> overflow) {
+        Map<String, DebugOptions> componentDebug = executor.getStormComponentDebug().get();
+        DebugOptions debugOptions = componentDebug.get(componentId);
+        if (debugOptions == null) {
+            debugOptions = componentDebug.get(executor.getStormId());
+        }
+        double spct = ((debugOptions != null) && (debugOptions.is_enable())) ? debugOptions.get_samplingpct() : 0;
+        if (spct > 0 && (random.nextDouble() * 100) < spct) {
+            sendUnanchored(StormCommon.EVENTLOGGER_STREAM_ID,
+                    new Values(componentId, messageId, System.currentTimeMillis(), values),
+                    executor.getExecutorTransfer(), overflow);
+        }
     }
 
     private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
@@ -190,7 +230,7 @@ public class Task {
                     ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())),
                     ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()),
             taskId,
-            workerData.getPort(), workerData.getTaskIds(),
+            workerData.getPort(), workerData.getLocalTaskIds(),
             workerData.getDefaultSharedResources(),
             workerData.getUserSharedResources(),
             executor.getSharedExecutorData(),
@@ -244,4 +284,26 @@ public class Task {
         }
     }
 
+    private static HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> getGroupersPerStream(Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper) {
+        HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> result = new HashMap<>(streamComponentToGrouper.size());
+
+        for (Entry<String, Map<String, LoadAwareCustomStreamGrouping>> entry : streamComponentToGrouper.entrySet()) {
+            String stream = entry.getKey();
+            Map<String, LoadAwareCustomStreamGrouping> groupers = entry.getValue();
+            ArrayList<LoadAwareCustomStreamGrouping> perStreamGroupers = new ArrayList<>();
+            if (groupers != null) { // null for __system bolt
+                for (LoadAwareCustomStreamGrouping grouper : groupers.values()) {
+                    perStreamGroupers.add(grouper);
+                }
+            }
+            result.put(stream, perStreamGroupers);
+        }
+        return result;
+    }
+
+
+    @Override
+    public String toString() {
+        return taskId.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
index ccb1f71..8237ad1 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
@@ -21,25 +21,12 @@ import org.apache.storm.Config;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IStatefulObject;
 import org.apache.storm.metric.api.StateMetric;
-import org.apache.storm.stats.BoltExecutorStats;
-import org.apache.storm.stats.CommonStats;
-import org.apache.storm.stats.SpoutExecutorStats;
-import org.apache.storm.stats.StatsUtil;
 import org.apache.storm.task.TopologyContext;
 
 import java.util.HashMap;
 import java.util.Map;
 
 public class BuiltinMetricsUtil {
-    public static BuiltinMetrics mkData(String type, CommonStats stats) {
-        if (StatsUtil.SPOUT.equals(type)) {
-            return new BuiltinSpoutMetrics((SpoutExecutorStats) stats);
-        } else if (StatsUtil.BOLT.equals(type)) {
-            return new BuiltinBoltMetrics((BoltExecutorStats) stats);
-        }
-        throw new RuntimeException("Invalid component type!");
-    }
-
     public static void registerIconnectionServerMetric(Object server, Map<String, Object> topoConf, TopologyContext context) {
         if (server instanceof IStatefulObject) {
             registerMetric("__recv-iconnection", new StateMetric((IStatefulObject) server), topoConf, context);

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
index a5f26b8..cd326f0 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
@@ -22,24 +22,25 @@ import org.apache.storm.metric.api.CountMetric;
 
 public class SpoutThrottlingMetrics extends BuiltinMetrics {
     private final CountMetric skippedMaxSpoutMs = new CountMetric();
-    private final CountMetric skippedThrottleMs = new CountMetric();
     private final CountMetric skippedInactiveMs = new CountMetric();
+    private final CountMetric skippedBackPressureMs = new CountMetric();
 
     public SpoutThrottlingMetrics() {
         metricMap.put("skipped-max-spout-ms", skippedMaxSpoutMs);
-        metricMap.put("skipped-throttle-ms", skippedThrottleMs);
         metricMap.put("skipped-inactive-ms", skippedInactiveMs);
+        metricMap.put("skipped-backpressure-ms", skippedBackPressureMs);
+
     }
 
     public void skippedMaxSpoutMs(long ms) {
         this.skippedMaxSpoutMs.incrBy(ms);
     }
 
-    public void skippedThrottleMs(long ms) {
-        this.skippedThrottleMs.incrBy(ms);
-    }
-
     public void skippedInactiveMs(long ms) {
         this.skippedInactiveMs.incrBy(ms);
     }
+
+    public void skippedBackPressureMs(long ms) {
+        this.skippedBackPressureMs.incrBy(ms);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
index 29e75ed..42d2bb6 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
@@ -123,8 +123,8 @@ public class ClientSupervisorUtils {
         }
         final Process process = builder.start();
         if (logPrefix != null || exitCodeCallback != null) {
-            Utils.asyncLoop(new Callable<Object>() {
-                public Object call() {
+            Utils.asyncLoop(new Callable<Long>() {
+                public Long call() {
                     if (logPrefix != null ) {
                         Utils.readAndLogStream(logPrefix,
                                 process.getInputStream());

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
new file mode 100644
index 0000000..9b31c1a
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.daemon.worker;
+
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.storm.messaging.netty.BackPressureStatus;
+import org.apache.storm.utils.JCQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.storm.Constants.SYSTEM_TASK_ID;
+
+/***
+ *   Tracks the BackPressure status using a Map<TaskId, JCQueue>.
+ *   Special value NONE, is used to indicate that the task is not under BackPressure
+ *   ConcurrentHashMap does not allow storing null values, so we use the special value NONE instead.
+ */
+public class BackPressureTracker {
+    private static final JCQueue NONE =  new JCQueue ("NoneQ", 2, 0, 1, null) { };
+
+    static final Logger LOG = LoggerFactory.getLogger(BackPressureTracker.class);
+
+    private final Map<Integer, JCQueue> tasks = new ConcurrentHashMap<>(); // updates are more frequent than iteration
+    private final String workerId;
+
+    public BackPressureTracker(String workerId, List<Integer> allLocalTasks) {
+        this.workerId = workerId;
+        for (Integer taskId : allLocalTasks) {
+            if(taskId != SYSTEM_TASK_ID) {
+                tasks.put(taskId, NONE);  // all tasks are considered to be not under BP initially
+            }
+        }
+    }
+
+    private void recordNoBackPressure(Integer taskId) {
+        tasks.put(taskId, NONE);
+    }
+
+    /***
+     * Record BP for a task
+     * This is called by transferLocalBatch() on NettyWorker thread
+     * @return true if an update was recorded, false if taskId is already under BP
+     */
+    public boolean recordBackPressure(Integer taskId, JCQueue recvQ) {
+        return tasks.put(taskId, recvQ) == NONE;
+    }
+
+    // returns true if there was a change in the BP situation
+    public boolean refreshBpTaskList() {
+        boolean changed = false;
+        LOG.debug("Running Back Pressure status change check");
+        for ( Entry<Integer, JCQueue> entry : tasks.entrySet()) {
+            if (entry.getValue() != NONE  &&  entry.getValue().isEmptyOverflow()) {
+                recordNoBackPressure(entry.getKey());
+                changed = true;
+            }
+        }
+        return changed;
+    }
+
+    public BackPressureStatus getCurrStatus() {
+        ArrayList<Integer> bpTasks = new ArrayList<>(tasks.size());
+        ArrayList<Integer> nonBpTasks = new ArrayList<>(tasks.size());
+
+        for (Entry<Integer, JCQueue> entry : tasks.entrySet()) {
+            JCQueue q = entry.getValue();
+            if (q != NONE) {
+                bpTasks.add(entry.getKey());
+            } else {
+                nonBpTasks.add(entry.getKey());
+            }
+        }
+        return new BackPressureStatus(workerId, bpTasks, nonBpTasks);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 57ceb39..ca0d4d0 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -19,7 +19,6 @@
 package org.apache.storm.daemon.worker;
 
 import com.google.common.base.Preconditions;
-import com.lmax.disruptor.EventHandler;
 
 import java.io.File;
 import java.io.IOException;
@@ -31,7 +30,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -41,6 +39,7 @@ import javax.security.auth.Subject;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.ObjectUtils;
 import org.apache.storm.Config;
+import org.apache.storm.Constants;
 import org.apache.storm.cluster.ClusterStateContext;
 import org.apache.storm.cluster.ClusterUtils;
 import org.apache.storm.cluster.DaemonType;
@@ -60,21 +59,19 @@ import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.generated.LogConfig;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.IContext;
-import org.apache.storm.messaging.TaskMessage;
 import org.apache.storm.security.auth.AuthUtils;
 import org.apache.storm.security.auth.IAutoCredentials;
 import org.apache.storm.stats.StatsUtil;
 import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.DisruptorBackpressureCallback;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.WorkerBackpressureCallback;
-import org.apache.storm.utils.WorkerBackpressureThread;
+import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
 
 public class Worker implements Shutdownable, DaemonCommon {
@@ -93,9 +90,7 @@ public class Worker implements Shutdownable, DaemonCommon {
     private WorkerState workerState;
     private AtomicReference<List<IRunningExecutor>> executorsAtom;
     private Thread transferThread;
-    private WorkerBackpressureThread backpressureThread;
-    // How long until the backpressure znode is invalid.
-    private long backpressureZnodeTimeoutMs;
+
     private AtomicReference<Credentials> credentialsAtom;
     private Subject subject;
     private Collection<IAutoCredentials> autoCreds;
@@ -151,138 +146,160 @@ public class Worker implements Shutdownable, DaemonCommon {
         }
         autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
         subject = AuthUtils.populateSubject(null, autoCreds, initCreds);
-        backpressureZnodeTimeoutMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_TIMEOUT_SECS)) * 1000;
-
-        Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
-            @Override public Object run() throws Exception {
-                workerState =
-                    new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
-                        stormClusterState, autoCreds);
-
-                // Heartbeat here so that worker process dies if this fails
-                // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
-                // that worker is running and moves on
-                doHeartBeat();
-
-                executorsAtom = new AtomicReference<>(null);
-
-                // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
-                // to the supervisor
-                workerState.heartbeatTimer
-                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
-                        try {
-                            doHeartBeat();
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-                    });
-
-                workerState.executorHeartbeatTimer
-                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
-                        Worker.this::doExecutorHeartbeats);
-
-                workerState.registerCallbacks();
-
-                workerState.refreshConnections(null);
-
-                workerState.activateWorkerWhenAllConnectionsReady();
-
-                workerState.refreshStormActive(null);
-
-                workerState.runWorkerStartHooks();
-
-                List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
-                for (List<Long> e : workerState.getExecutors()) {
-                    if (ConfigUtils.isLocalMode(topologyConf)) {
-                        newExecutors.add(
-                            LocalExecutor.mkExecutor(workerState, e, initCreds)
-                                .execute());
-                    } else {
-                        newExecutors.add(
-                            Executor.mkExecutor(workerState, e, initCreds)
-                                .execute());
-                    }
+
+        Subject.doAs(subject, (PrivilegedExceptionAction<Object>)
+            () -> loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials)
+        );
+
+    }
+
+    private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
+                              Map<String, String> initCreds, Credentials initialCredentials)
+        throws Exception {
+        workerState = new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
+            stormClusterState, autoCreds);
+
+        // Heartbeat here so that worker process dies if this fails
+        // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
+        // that worker is running and moves on
+        doHeartBeat();
+
+        executorsAtom = new AtomicReference<>(null);
+
+        // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
+        // to the supervisor
+        workerState.heartbeatTimer
+            .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
+                try {
+                    doHeartBeat();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+        workerState.executorHeartbeatTimer
+            .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
+                Worker.this::doExecutorHeartbeats);
+
+        workerState.registerCallbacks();
+
+        workerState.refreshConnections(null);
+
+        workerState.activateWorkerWhenAllConnectionsReady();
+
+        workerState.refreshStormActive(null);
+
+        workerState.runWorkerStartHooks();
+
+        List<Executor> execs = new ArrayList<>();
+        for (List<Long> e : workerState.getLocalExecutors()) {
+            if (ConfigUtils.isLocalMode(topologyConf)) {
+                Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
+                execs.add(executor);
+                for (int i = 0; i < executor.getTaskIds().size(); ++i) {
+                    workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
                 }
-                executorsAtom.set(newExecutors);
-
-                EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
-                    .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
-
-                // This thread will publish the messages destined for remote tasks to remote connections
-                transferThread = Utils.asyncLoop(() -> {
-                    workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
-                    return 0L;
-                });
-
-                DisruptorBackpressureCallback disruptorBackpressureHandler =
-                    mkDisruptorBackpressureHandler(workerState);
-                workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
-                workerState.transferQueue
-                    .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
-                workerState.transferQueue
-                    .setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
-                workerState.transferQueue
-                    .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
-
-                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler(topologyConf);
-                backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
-                if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
-                    backpressureThread.start();
-                    stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, workerState::refreshThrottle);
-                    
-                    int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
-                    workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle);
+            } else {
+                Executor executor = Executor.mkExecutor(workerState, e, initCreds);
+                for (int i = 0; i < executor.getTaskIds().size(); ++i) {
+                    workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
                 }
+                execs.add(executor);
+            }
+        }
+
+        List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
+        for (Executor executor : execs) {
+            newExecutors.add(executor.execute());
+        }
+        executorsAtom.set(newExecutors);
+
+
+        // This thread will send out messages destined for remote tasks (on other workers)
+        if ( ( (Long)topologyConf.get(Config.TOPOLOGY_WORKERS) ) > 1 ) {
+            transferThread = workerState.makeTransferThread();
+            transferThread.setName("Worker-Transfer");
+        }
+
+        credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
 
-                credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
-
-                establishLogSettingCallback();
-
-                workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
-
-                workerState.refreshCredentialsTimer.scheduleRecurring(0,
-                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
-                        @Override public void run() {
-                            checkCredentialsChanged();
-                            if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
-                               checkThrottleChanged();
-                            }
-                        }
-                    });
-
-                workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
-                        (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() {
-                            @Override public void run() {
-                                try {
-                                    LOG.debug("Checking if blobs have updated");
-                                    updateBlobUpdates();
-                                } catch (IOException e) {
-                                    // IOException from reading the version files to be ignored
-                                    LOG.error(e.getStackTrace().toString());
-                                }
-                            }
-                        });
-
-                // The jitter allows the clients to get the data at different times, and avoids thundering herd
-                if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
-                    workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, Worker.this::doRefreshLoad);
+        establishLogSettingCallback();
+
+        workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
+
+        workerState.refreshCredentialsTimer.scheduleRecurring(0,
+            (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), () -> {
+                checkCredentialsChanged();
+            });
+
+        workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
+            (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10),
+            () -> {
+                try {
+                    LOG.debug("Checking if blobs have updated");
+                    updateBlobUpdates();
+                } catch (IOException e) {
+                    // IOException from reading the version files to be ignored
+                    LOG.error(e.getStackTrace().toString());
                 }
+            }
+        );
 
-                workerState.refreshConnectionsTimer.scheduleRecurring(0,
-                    (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
+        // The jitter allows the clients to get the data at different times, and avoids thundering herd
+        if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
+            workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, Worker.this::doRefreshLoad);
+        }
 
-                workerState.resetLogLevelsTimer.scheduleRecurring(0,
-                    (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
+        workerState.refreshConnectionsTimer.scheduleRecurring(0,
+            (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
 
-                workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
-                    workerState::refreshStormActive);
+        workerState.resetLogLevelsTimer.scheduleRecurring(0,
+            (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
 
-                LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
-                LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId, assignmentId, port);
-                return this;
-            };
-        });
+        workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
+            workerState::refreshStormActive);
 
+        setupFlushTupleTimer(topologyConf, newExecutors);
+        setupBackPressureCheckTimer(topologyConf);
+
+        LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
+        LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId, assignmentId, port);
+        return this;
+    }
+
+    private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final List<IRunningExecutor> executors) {
+        final Integer producerBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
+        final Integer xferBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE));
+        final Long flushIntervalMillis = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS));
+        if ((producerBatchSize == 1 && xferBatchSize == 1) || flushIntervalMillis == 0) {
+            LOG.info("Flush Tuple generation disabled. producerBatchSize={}, xferBatchSize={}, flushIntervalMillis={}", producerBatchSize, xferBatchSize, flushIntervalMillis);
+            return;
+        }
+
+        workerState.flushTupleTimer.scheduleRecurringMs(flushIntervalMillis, flushIntervalMillis,
+            () -> {
+                // send flush tuple to all local executors
+                for (int i = 0; i < executors.size(); i++) {
+                    IRunningExecutor exec = executors.get(i);
+                    if (exec.getExecutorId().get(0) != Constants.SYSTEM_TASK_ID) {
+                        exec.publishFlushTuple();
+                    }
+                }
+            }
+        );
+        LOG.info("Flush tuple will be generated every {} millis", flushIntervalMillis);
+    }
+
+    private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) {
+        final Integer workerCount = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_WORKERS));
+        if (workerCount <= 1) {
+            LOG.info("BackPressure change checking is disabled as there is only one worker");
+            return;
+        }
+        final Long bpCheckIntervalMs = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_CHECK_MILLIS));
+        workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs
+            , bpCheckIntervalMs, () -> workerState.refreshBackPressureStatus());
+        LOG.info("BackPressure status change checking will be performed every {} millis", bpCheckIntervalMs);
     }
 
     public void doRefreshLoad() {
@@ -297,7 +314,7 @@ public class Worker implements Shutdownable, DaemonCommon {
     public void doHeartBeat() throws IOException {
         LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
         state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
-            workerState.executors.stream()
+            workerState.localExecutors.stream()
                 .map(executor -> new ExecutorInfo(executor.get(0).intValue(), executor.get(1).intValue()))
                 .collect(Collectors.toList()), workerState.port));
         state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
@@ -308,11 +325,10 @@ public class Worker implements Shutdownable, DaemonCommon {
         Map<List<Integer>, ExecutorStats> stats;
         List<IRunningExecutor> executors = this.executorsAtom.get();
         if (null == executors) {
-            stats = StatsUtil.mkEmptyExecutorZkHbs(workerState.executors);
+            stats = StatsUtil.mkEmptyExecutorZkHbs(workerState.localExecutors);
         } else {
             stats = StatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
-                .toMap((Function<IRunningExecutor, List<Long>>) IRunningExecutor::getExecutorId,
-                    (Function<IRunningExecutor, ExecutorStats>) IRunningExecutor::renderStats)));
+                .toMap(IRunningExecutor::getExecutorId, IRunningExecutor::renderStats)));
         }
         Map<String, Object> zkHB = StatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime());
         try {
@@ -356,7 +372,7 @@ public class Worker implements Shutdownable, DaemonCommon {
 
     public void checkCredentialsChanged() {
         Credentials newCreds = workerState.stormClusterState.credentials(topologyId, null);
-        if (! ObjectUtils.equals(newCreds, credentialsAtom.get())) {
+        if (!ObjectUtils.equals(newCreds, credentialsAtom.get())) {
             // This does not have to be atomic, worst case we update when one is not needed
             AuthUtils.updateSubject(subject, autoCreds, (null == newCreds) ? null : newCreds.get_creds());
             for (IRunningExecutor executor : executorsAtom.get()) {
@@ -366,12 +382,6 @@ public class Worker implements Shutdownable, DaemonCommon {
         }
     }
 
-    public void checkThrottleChanged() {
-        boolean throttleOn =
-                workerState.stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, this::checkThrottleChanged);
-        workerState.throttleOn.set(throttleOn);
-    }
-
     public void checkLogConfigChanged() {
         LogConfig logConfig = workerState.stormClusterState.topologyLogConfig(topologyId, null);
         logConfigManager.processLogConfigChange(logConfig);
@@ -382,71 +392,8 @@ public class Worker implements Shutdownable, DaemonCommon {
         workerState.stormClusterState.topologyLogConfig(topologyId, this::checkLogConfigChanged);
     }
 
-
-    /**
-     * make a handler for the worker's send disruptor queue to
-     * check highWaterMark and lowWaterMark for backpressure.
-     */
-    private DisruptorBackpressureCallback mkDisruptorBackpressureHandler(WorkerState workerState) {
-        return new DisruptorBackpressureCallback() {
-            @Override public void highWaterMark() throws Exception {
-                LOG.debug("worker {} transfer-queue is congested, checking backpressure state", workerState.workerId);
-                WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger);
-            }
-
-            @Override public void lowWaterMark() throws Exception {
-                LOG.debug("worker {} transfer-queue is not congested, checking backpressure state", workerState.workerId);
-                WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger);
-            }
-        };
-    }
-
-    /**
-     * make a handler that checks and updates worker's backpressure flag.
-     */
-    private WorkerBackpressureCallback mkBackpressureHandler(Map<String, Object> topologyConf) {
-        final List<IRunningExecutor> executors = executorsAtom.get();
-        final long updateFreqMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS)) * 1000;
-        return new WorkerBackpressureCallback() {
-            @Override public void onEvent(Object obj) {
-                if (null != executors) {
-                    String topologyId = workerState.topologyId;
-                    String assignmentId = workerState.assignmentId;
-                    int port = workerState.port;
-                    IStormClusterState stormClusterState = workerState.stormClusterState;
-                    long prevBackpressureTimestamp = workerState.backpressure.get();
-                    long currTimestamp = System.currentTimeMillis();
-                    long currBackpressureTimestamp = 0;
-                    // the backpressure flag is true if at least one of the disruptor queues has throttle-on
-                    boolean backpressureFlag = workerState.transferQueue.getThrottleOn() || (executors.stream()
-                            .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get());
-
-                    if (backpressureFlag) {
-                        // update the backpressure timestamp every updateFreqMs ms
-                        if ((currTimestamp - prevBackpressureTimestamp) > updateFreqMs) {
-                            currBackpressureTimestamp = currTimestamp;
-                        } else {
-                            currBackpressureTimestamp = prevBackpressureTimestamp;
-                        }
-                    }
-
-                    if (currBackpressureTimestamp != prevBackpressureTimestamp) {
-                        try {
-                            LOG.debug("worker backpressure timestamp changing from {} to {}",
-                                    prevBackpressureTimestamp, currBackpressureTimestamp);
-                            stormClusterState.workerBackpressure(topologyId, assignmentId, (long) port, currBackpressureTimestamp);
-                            // doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception
-                            workerState.backpressure.set(currBackpressureTimestamp);
-                        } catch (Exception ex) {
-                            LOG.error("workerBackpressure update failed when connecting to ZK ... will retry", ex);
-                        }
-                    }
-                }
-            }
-        };
-    }
-
-    @Override public void shutdown() {
+    @Override
+    public void shutdown() {
         try {
             LOG.info("Shutting down worker {} {} {}", topologyId, assignmentId, port);
 
@@ -466,32 +413,32 @@ public class Worker implements Shutdownable, DaemonCommon {
             // in which case it's a noop
             workerState.mqContext.term();
             LOG.info("Shutting down transfer thread");
-            workerState.transferQueue.haltWithInterrupt();
+            workerState.haltWorkerTransfer();
 
-            transferThread.interrupt();
-            transferThread.join();
-            LOG.info("Shut down transfer thread");
 
-            backpressureThread.terminate();
-            LOG.info("Shut down backpressure thread");
+            if (transferThread != null) {
+                transferThread.interrupt();
+                transferThread.join();
+                LOG.info("Shut down transfer thread");
+            }
 
             workerState.heartbeatTimer.close();
             workerState.refreshConnectionsTimer.close();
             workerState.refreshCredentialsTimer.close();
             workerState.checkForUpdatedBlobsTimer.close();
-            workerState.refreshBackpressureTimer.close();
             workerState.refreshActiveTimer.close();
             workerState.executorHeartbeatTimer.close();
             workerState.userTimer.close();
             workerState.refreshLoadTimer.close();
             workerState.resetLogLevelsTimer.close();
+            workerState.flushTupleTimer.close();
+            workerState.backPressureCheckTimer.close();
             workerState.closeResources();
 
             LOG.info("Trigger any worker shutdown hooks");
             workerState.runWorkerShutdownHooks();
 
             workerState.stormClusterState.removeWorkerHeartbeat(topologyId, assignmentId, (long) port);
-            workerState.stormClusterState.removeWorkerBackpressure(topologyId, assignmentId, (long) port);
             LOG.info("Disconnecting from storm cluster state context");
             workerState.stormClusterState.disconnect();
             workerState.stateStorage.close();
@@ -502,16 +449,17 @@ public class Worker implements Shutdownable, DaemonCommon {
 
     }
 
-    @Override public boolean isWaiting() {
+    @Override
+    public boolean isWaiting() {
         return workerState.heartbeatTimer.isTimerWaiting()
             && workerState.refreshConnectionsTimer.isTimerWaiting()
             && workerState.refreshLoadTimer.isTimerWaiting()
             && workerState.refreshCredentialsTimer.isTimerWaiting()
             && workerState.checkForUpdatedBlobsTimer.isTimerWaiting()
-            && workerState.refreshBackpressureTimer.isTimerWaiting()
             && workerState.refreshActiveTimer.isTimerWaiting()
             && workerState.executorHeartbeatTimer.isTimerWaiting()
-            && workerState.userTimer.isTimerWaiting();
+            && workerState.userTimer.isTimerWaiting()
+            && workerState.flushTupleTimer.isTimerWaiting();
     }
 
     public static void main(String[] args) throws Exception {