You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/22 17:36:37 UTC

[1/4] storm git commit: [STORM-2744] add in restart timeout for backpressure

Repository: storm
Updated Branches:
  refs/heads/master d2e221ae8 -> db510ae58


[STORM-2744] add in restart timeout for backpressure


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bebc09cf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bebc09cf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bebc09cf

Branch: refs/heads/master
Commit: bebc09cfb744f760bf14395513145165cd214e77
Parents: 50d55a9
Author: Ethan Li <et...@gmail.com>
Authored: Wed Sep 20 09:33:29 2017 -0500
Committer: Ethan Li <et...@gmail.com>
Committed: Thu Sep 21 18:36:03 2017 +0000

----------------------------------------------------------------------
 .../org/apache/storm/cluster/ClusterUtils.java  | 10 +++++
 .../storm/cluster/IStormClusterState.java       |  2 +-
 .../storm/cluster/StormClusterStateImpl.java    | 33 ++++++++------
 .../org/apache/storm/daemon/worker/Worker.java  | 45 ++++++++++++--------
 .../apache/storm/daemon/worker/WorkerState.java |  4 +-
 5 files changed, 60 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
index 43b0574..0dc3e20 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -144,6 +144,16 @@ public class ClusterUtils {
         return backpressureStormRoot(stormId) + ZK_SEPERATOR + node + "-" + port;
     }
 
+    /**
+     * Get the backpressure znode full path.
+     * @param stormId The topology id
+     * @param shortPath A string in the form of "node-port"
+     * @return The backpressure znode path
+     */
+    public static String backpressurePath(String stormId, String shortPath) {
+        return backpressureStormRoot(stormId) + ZK_SEPERATOR + shortPath;
+    }
+
     public static String errorStormRoot(String stormId) {
         return ERRORS_SUBTREE + ZK_SEPERATOR + stormId;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 704c9e5..3ece640 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -99,7 +99,7 @@ public interface IStormClusterState {
 
     public void supervisorHeartbeat(String supervisorId, SupervisorInfo info);
 
-    public void workerBackpressure(String stormId, String node, Long port, boolean on);
+    public void workerBackpressure(String stormId, String node, Long port, long timestamp);
 
     public boolean topologyBackpressure(String stormId, Runnable callback);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 343b0e6..19ee169 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -32,6 +32,7 @@ import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -423,24 +424,26 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     /**
-     * if znode exists and to be not on?, delete; if exists and on?, do nothing; if not exists and to be on?, create; if not exists and not on?, do nothing;
+     * if znode exists and timestamp is 0?, delete; if exists and timestamp is larger than 0?, do nothing;
+     * if not exists and timestamp is larger than 0?, create the node and set the timestamp; if not exists and timestamp is 0?, do nothing;
      * 
      * @param stormId
      * @param node
      * @param port
-     * @param on
+     * @param timestamp
      */
     @Override
-    public void workerBackpressure(String stormId, String node, Long port, boolean on) {
+    public void workerBackpressure(String stormId, String node, Long port, long timestamp) {
         String path = ClusterUtils.backpressurePath(stormId, node, port);
         boolean existed = stateStorage.node_exists(path, false);
         if (existed) {
-            if (on == false)
+            if (timestamp == 0) {
                 stateStorage.delete_node(path);
-
+            }
         } else {
-            if (on == true) {
-                stateStorage.set_ephemeral_node(path, null, acls);
+            if (timestamp > 0) {
+                byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
+                stateStorage.set_ephemeral_node(path, data, acls);
             }
         }
     }
@@ -448,7 +451,8 @@ public class StormClusterStateImpl implements IStormClusterState {
     /**
      * Check whether a topology is in throttle-on status or not:
      * if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.
-     * 
+     * But if the backpresure/storm-id dir is not empty and has not been updated for more than 30s, we treat it as throttle-off.
+     * This will prevent the spouts from getting stuck indefinitely if something wrong happens.
      * @param stormId
      * @param callback
      * @return
@@ -459,14 +463,15 @@ public class StormClusterStateImpl implements IStormClusterState {
             backPressureCallback.put(stormId, callback);
         }
         String path = ClusterUtils.backpressureStormRoot(stormId);
-        List<String> childrens = null;
+        long mostRecentTimestamp = 0;
         if(stateStorage.node_exists(path, false)) {
-            childrens = stateStorage.get_children(path, callback != null);
-        } else {
-            childrens = new ArrayList<>();
+            List<String> children = stateStorage.get_children(path, callback != null);
+            mostRecentTimestamp = children.stream().map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
+                    .filter(data -> data != null).mapToLong(data -> ByteBuffer.wrap(data).getLong()).max().orElse(0);
         }
-        return childrens.size() > 0;
-
+        boolean ret =  ((System.currentTimeMillis() - mostRecentTimestamp) < 30000);
+        LOG.debug("topology backpressure is {}", ret ? "on" : "off");
+        return ret;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 9e7bd0b..d878fc8 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -406,25 +406,36 @@ public class Worker implements Shutdownable, DaemonCommon {
         final List<IRunningExecutor> executors = executorsAtom.get();
         return new WorkerBackpressureCallback() {
             @Override public void onEvent(Object obj) {
-                String topologyId = workerState.topologyId;
-                String assignmentId = workerState.assignmentId;
-                int port = workerState.port;
-                IStormClusterState stormClusterState = workerState.stormClusterState;
-                boolean prevBackpressureFlag = workerState.backpressure.get();
-                boolean currBackpressureFlag = prevBackpressureFlag;
                 if (null != executors) {
-                    currBackpressureFlag = workerState.transferQueue.getThrottleOn() || (executors.stream()
-                        .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get());
-                }
+                    String topologyId = workerState.topologyId;
+                    String assignmentId = workerState.assignmentId;
+                    int port = workerState.port;
+                    IStormClusterState stormClusterState = workerState.stormClusterState;
+                    long prevBackpressureTimestamp = workerState.backpressure.get();
+                    long currTimestamp = System.currentTimeMillis();
+                    long currBackpressureTimestamp = 0;
+                    // the backpressure flag is true if at least one of the disruptor queues has throttle-on
+                    boolean backpressureFlag = workerState.transferQueue.getThrottleOn() || (executors.stream()
+                            .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get());
+
+                    if (backpressureFlag) {
+                        // update the backpressure timestamp every 15 seconds
+                        if ((currTimestamp - prevBackpressureTimestamp) > 15000) {
+                            currBackpressureTimestamp = currTimestamp;
+                        } else {
+                            currBackpressureTimestamp = prevBackpressureTimestamp;
+                        }
+                    }
 
-                if (currBackpressureFlag != prevBackpressureFlag) {
-                    try {
-                        LOG.debug("worker backpressure flag changing from {} to {}", prevBackpressureFlag, currBackpressureFlag);
-                        stormClusterState.workerBackpressure(topologyId, assignmentId, (long) port, currBackpressureFlag);
-                        // doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception
-                        workerState.backpressure.set(currBackpressureFlag);
-                    } catch (Exception ex) {
-                        LOG.error("workerBackpressure update failed when connecting to ZK ... will retry", ex);
+                    if (currBackpressureTimestamp != prevBackpressureTimestamp) {
+                        try {
+                            LOG.debug("worker backpressure timestamp changing from {} to {}", prevBackpressureTimestamp, currBackpressureTimestamp);
+                            stormClusterState.workerBackpressure(topologyId, assignmentId, (long) port, currBackpressureTimestamp);
+                            // doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception
+                            workerState.backpressure.set(currBackpressureTimestamp);
+                        } catch (Exception ex) {
+                            LOG.error("workerBackpressure update failed when connecting to ZK ... will retry", ex);
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 33ea579..f2c09b1 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -211,8 +211,8 @@ public class WorkerState {
     final Map<String, Object> userSharedResources;
     final LoadMapping loadMapping;
     final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions;
-    // Whether this worker is going slow
-    final AtomicBoolean backpressure = new AtomicBoolean(false);
+    // Whether this worker is going slow. 0 indicates the backpressure is off
+    final AtomicLong backpressure = new AtomicLong(0);
     // If the transfer queue is backed-up
     final AtomicBoolean transferBackpressure = new AtomicBoolean(false);
     // a trigger for synchronization with executors


[2/4] storm git commit: [STORM-2744] Address review comments

Posted by bo...@apache.org.
[STORM-2744] Address review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/884eb61d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/884eb61d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/884eb61d

Branch: refs/heads/master
Commit: 884eb61ddab0c3da7ef587793b8543ba0f906451
Parents: bebc09c
Author: Ethan Li <et...@gmail.com>
Authored: Fri Sep 22 09:38:12 2017 -0500
Committer: Ethan Li <et...@gmail.com>
Committed: Fri Sep 22 09:38:46 2017 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |  2 +
 .../src/jvm/org/apache/storm/Config.java        | 17 ++++++++
 .../storm/cluster/IStormClusterState.java       |  2 +-
 .../storm/cluster/StormClusterStateImpl.java    | 41 ++++++++++++--------
 .../org/apache/storm/daemon/worker/Worker.java  | 17 ++++----
 .../apache/storm/daemon/worker/WorkerState.java |  6 ++-
 6 files changed, 60 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index c6ef390..16df356 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -190,6 +190,8 @@ task.backpressure.poll.secs: 30
 topology.backpressure.enable: false
 backpressure.disruptor.high.watermark: 0.9
 backpressure.disruptor.low.watermark: 0.4
+backpressure.znode.timeout.secs: 30
+backpressure.znode.update.freq.secs: 15
 
 zmq.threads: 1
 zmq.linger.millis: 5000

http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 5623698..7149631 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -129,6 +129,23 @@ public class Config extends HashMap<String, Object> {
     public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark";
 
     /**
+     * How long until the backpressure znode is invalid.
+     * It's measured by the data (timestamp) of the znode, not the ctime (creation time) or mtime (modification time), etc.
+     * This must be larger than BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String BACKPRESSURE_ZNODE_TIMEOUT_SECS = "backpressure.znode.timeout.secs";
+
+    /**
+     * How often will the data (timestamp) of backpressure znode be updated.
+     * But if the worker backpressure status (on/off) changes, the znode will be updated anyway.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS = "backpressure.znode.update.freq.secs";
+
+    /**
      * A list of users that are allowed to interact with the topology.  To use this set
      * nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 3ece640..674865b 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -101,7 +101,7 @@ public interface IStormClusterState {
 
     public void workerBackpressure(String stormId, String node, Long port, long timestamp);
 
-    public boolean topologyBackpressure(String stormId, Runnable callback);
+    public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback);
 
     public void setupBackpressure(String stormId);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 19ee169..4d7e2a5 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -424,13 +424,14 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     /**
-     * if znode exists and timestamp is 0?, delete; if exists and timestamp is larger than 0?, do nothing;
-     * if not exists and timestamp is larger than 0?, create the node and set the timestamp; if not exists and timestamp is 0?, do nothing;
-     * 
-     * @param stormId
-     * @param node
-     * @param port
-     * @param timestamp
+     * If znode exists and timestamp is 0, delete;
+     * if exists and timestamp is larger than 0, update the timestamp;
+     * if not exists and timestamp is larger than 0, create the znode and set the timestamp;
+     * if not exists and timestamp is 0, do nothing.
+     * @param stormId The topology Id
+     * @param node The node id
+     * @param port The port number
+     * @param timestamp The backpressure timestamp. 0 means turning off the worker backpressure
      */
     @Override
     public void workerBackpressure(String stormId, String node, Long port, long timestamp) {
@@ -439,6 +440,9 @@ public class StormClusterStateImpl implements IStormClusterState {
         if (existed) {
             if (timestamp == 0) {
                 stateStorage.delete_node(path);
+            } else {
+                byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
+                stateStorage.set_data(path, data, acls);
             }
         } else {
             if (timestamp > 0) {
@@ -451,14 +455,15 @@ public class StormClusterStateImpl implements IStormClusterState {
     /**
      * Check whether a topology is in throttle-on status or not:
      * if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.
-     * But if the backpresure/storm-id dir is not empty and has not been updated for more than 30s, we treat it as throttle-off.
+     * But if the backpresure/storm-id dir is not empty and has not been updated for more than timeoutMs, we treat it as throttle-off.
      * This will prevent the spouts from getting stuck indefinitely if something wrong happens.
-     * @param stormId
-     * @param callback
-     * @return
+     * @param stormId The topology Id
+     * @param timeoutMs How long until the backpressure znode is invalid.
+     * @param callback The callback function
+     * @return True is backpresure/storm-id dir is not empty and at least one of the backpressure znodes has not timed out; false otherwise.
      */
     @Override
-    public boolean topologyBackpressure(String stormId, Runnable callback) {
+    public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback) {
         if (callback != null) {
             backPressureCallback.put(stormId, callback);
         }
@@ -466,10 +471,14 @@ public class StormClusterStateImpl implements IStormClusterState {
         long mostRecentTimestamp = 0;
         if(stateStorage.node_exists(path, false)) {
             List<String> children = stateStorage.get_children(path, callback != null);
-            mostRecentTimestamp = children.stream().map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
-                    .filter(data -> data != null).mapToLong(data -> ByteBuffer.wrap(data).getLong()).max().orElse(0);
-        }
-        boolean ret =  ((System.currentTimeMillis() - mostRecentTimestamp) < 30000);
+            mostRecentTimestamp = children.stream()
+                    .map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
+                    .filter(data -> data != null)
+                    .mapToLong(data -> ByteBuffer.wrap(data).getLong())
+                    .max()
+                    .orElse(0);
+        }
+        boolean ret =  ((System.currentTimeMillis() - mostRecentTimestamp) < timeoutMs);
         LOG.debug("topology backpressure is {}", ret ? "on" : "off");
         return ret;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index d878fc8..519e7ce 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -95,7 +95,8 @@ public class Worker implements Shutdownable, DaemonCommon {
     private AtomicReference<List<IRunningExecutor>> executorsAtom;
     private Thread transferThread;
     private WorkerBackpressureThread backpressureThread;
-
+    // How long until the backpressure znode is invalid.
+    private long backpressureZnodeTimeoutMs;
     private AtomicReference<Credentials> credentialsAtom;
     private Subject subject;
     private Collection<IAutoCredentials> autoCreds;
@@ -152,6 +153,7 @@ public class Worker implements Shutdownable, DaemonCommon {
         }
         autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
         subject = AuthUtils.populateSubject(null, autoCreds, initCreds);
+        backpressureZnodeTimeoutMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_TIMEOUT_SECS)) * 1000;
 
         Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
             @Override public Object run() throws Exception {
@@ -224,11 +226,11 @@ public class Worker implements Shutdownable, DaemonCommon {
                 workerState.transferQueue
                     .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
 
-                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
+                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler(topologyConf);
                 backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
                 if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
                     backpressureThread.start();
-                    stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
+                    stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, workerState::refreshThrottle);
                     
                     int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
                     workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle);
@@ -366,7 +368,7 @@ public class Worker implements Shutdownable, DaemonCommon {
     }
 
     public void checkThrottleChanged() {
-        boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, this::checkThrottleChanged);
+        boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, this::checkThrottleChanged);
         workerState.throttleOn.set(throttleOn);
     }
 
@@ -402,8 +404,9 @@ public class Worker implements Shutdownable, DaemonCommon {
     /**
      * make a handler that checks and updates worker's backpressure flag
      */
-    private WorkerBackpressureCallback mkBackpressureHandler() {
+    private WorkerBackpressureCallback mkBackpressureHandler(Map<String, Object> topologyConf) {
         final List<IRunningExecutor> executors = executorsAtom.get();
+        final long updateFreqMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS)) * 1000;
         return new WorkerBackpressureCallback() {
             @Override public void onEvent(Object obj) {
                 if (null != executors) {
@@ -419,8 +422,8 @@ public class Worker implements Shutdownable, DaemonCommon {
                             .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get());
 
                     if (backpressureFlag) {
-                        // update the backpressure timestamp every 15 seconds
-                        if ((currTimestamp - prevBackpressureTimestamp) > 15000) {
+                        // update the backpressure timestamp every updateFreqMs ms
+                        if ((currTimestamp - prevBackpressureTimestamp) > updateFreqMs) {
                             currBackpressureTimestamp = currTimestamp;
                         } else {
                             currBackpressureTimestamp = prevBackpressureTimestamp;

http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index f2c09b1..d679ee8 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -213,6 +213,8 @@ public class WorkerState {
     final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions;
     // Whether this worker is going slow. 0 indicates the backpressure is off
     final AtomicLong backpressure = new AtomicLong(0);
+    // How long until the backpressure znode is invalid.
+    final long backpressureZnodeTimeoutMs;
     // If the transfer queue is backed-up
     final AtomicBoolean transferBackpressure = new AtomicBoolean(false);
     // a trigger for synchronization with executors
@@ -298,6 +300,7 @@ public class WorkerState {
         }
         Collections.sort(taskIds);
         this.topologyConf = topologyConf;
+        this.backpressureZnodeTimeoutMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_TIMEOUT_SECS)) * 1000;
         this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, AdvancedFSOps.make(conf));
         this.systemTopology = StormCommon.systemTopology(topologyConf, topology);
         this.taskToComponent = StormCommon.stormTaskInfo(topology, topologyConf);
@@ -333,6 +336,7 @@ public class WorkerState {
             LOG.warn("WILL TRY TO SERIALIZE ALL TUPLES (Turn off {} for production", Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE);
         }
         this.drainer = new TransferDrainer();
+
     }
 
     public void refreshConnections() {
@@ -441,7 +445,7 @@ public class WorkerState {
     }
 
     public void refreshThrottle() {
-        boolean backpressure = stormClusterState.topologyBackpressure(topologyId, this::refreshThrottle);
+        boolean backpressure = stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, this::refreshThrottle);
         this.throttleOn.set(backpressure);
     }
 


[3/4] storm git commit: [STORM-2744] Address review comments

Posted by bo...@apache.org.
[STORM-2744] Address review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/de0118d3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/de0118d3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/de0118d3

Branch: refs/heads/master
Commit: de0118d3c80d5bcf67aa20fb4b15743bc7fc2a5b
Parents: 884eb61
Author: Ethan Li <et...@gmail.com>
Authored: Fri Sep 22 10:14:49 2017 -0500
Committer: Ethan Li <et...@gmail.com>
Committed: Fri Sep 22 10:14:49 2017 -0500

----------------------------------------------------------------------
 .../org/apache/storm/cluster/StormClusterStateImpl.java   | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/de0118d3/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 4d7e2a5..7a52d86 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -424,21 +424,21 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     /**
-     * If znode exists and timestamp is 0, delete;
+     * If znode exists and timestamp is non-positive, delete;
      * if exists and timestamp is larger than 0, update the timestamp;
      * if not exists and timestamp is larger than 0, create the znode and set the timestamp;
-     * if not exists and timestamp is 0, do nothing.
+     * if not exists and timestamp is non-positive, do nothing.
      * @param stormId The topology Id
      * @param node The node id
      * @param port The port number
-     * @param timestamp The backpressure timestamp. 0 means turning off the worker backpressure
+     * @param timestamp The backpressure timestamp. Non-positive means turning off the worker backpressure
      */
     @Override
     public void workerBackpressure(String stormId, String node, Long port, long timestamp) {
         String path = ClusterUtils.backpressurePath(stormId, node, port);
         boolean existed = stateStorage.node_exists(path, false);
         if (existed) {
-            if (timestamp == 0) {
+            if (timestamp <= 0) {
                 stateStorage.delete_node(path);
             } else {
                 byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
@@ -478,7 +478,7 @@ public class StormClusterStateImpl implements IStormClusterState {
                     .max()
                     .orElse(0);
         }
-        boolean ret =  ((System.currentTimeMillis() - mostRecentTimestamp) < timeoutMs);
+        boolean ret = ((System.currentTimeMillis() - mostRecentTimestamp) < timeoutMs);
         LOG.debug("topology backpressure is {}", ret ? "on" : "off");
         return ret;
     }


[4/4] storm git commit: Merge branch 'STORM-2744' of https://github.com/Ethanlm/storm into STORM-2744

Posted by bo...@apache.org.
Merge branch 'STORM-2744' of https://github.com/Ethanlm/storm into STORM-2744

STORM-2744: add in restart timeout for backpressure

This closes #2338


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/db510ae5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/db510ae5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/db510ae5

Branch: refs/heads/master
Commit: db510ae588f6cf2547c8c4c4b220b53f9db912d0
Parents: d2e221a de0118d
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Sep 22 12:10:07 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Sep 22 12:10:07 2017 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |  2 +
 .../src/jvm/org/apache/storm/Config.java        | 17 ++++++
 .../org/apache/storm/cluster/ClusterUtils.java  | 10 ++++
 .../storm/cluster/IStormClusterState.java       |  4 +-
 .../storm/cluster/StormClusterStateImpl.java    | 60 ++++++++++++--------
 .../org/apache/storm/daemon/worker/Worker.java  | 58 ++++++++++++-------
 .../apache/storm/daemon/worker/WorkerState.java | 10 +++-
 7 files changed, 111 insertions(+), 50 deletions(-)
----------------------------------------------------------------------