You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/22 17:36:37 UTC
[1/4] storm git commit: [STORM-2744] add in restart timeout for
backpressure
Repository: storm
Updated Branches:
refs/heads/master d2e221ae8 -> db510ae58
[STORM-2744] add in restart timeout for backpressure
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bebc09cf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bebc09cf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bebc09cf
Branch: refs/heads/master
Commit: bebc09cfb744f760bf14395513145165cd214e77
Parents: 50d55a9
Author: Ethan Li <et...@gmail.com>
Authored: Wed Sep 20 09:33:29 2017 -0500
Committer: Ethan Li <et...@gmail.com>
Committed: Thu Sep 21 18:36:03 2017 +0000
----------------------------------------------------------------------
.../org/apache/storm/cluster/ClusterUtils.java | 10 +++++
.../storm/cluster/IStormClusterState.java | 2 +-
.../storm/cluster/StormClusterStateImpl.java | 33 ++++++++------
.../org/apache/storm/daemon/worker/Worker.java | 45 ++++++++++++--------
.../apache/storm/daemon/worker/WorkerState.java | 4 +-
5 files changed, 60 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
index 43b0574..0dc3e20 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -144,6 +144,16 @@ public class ClusterUtils {
return backpressureStormRoot(stormId) + ZK_SEPERATOR + node + "-" + port;
}
+ /**
+ * Get the backpressure znode full path.
+ * @param stormId The topology id
+ * @param shortPath A string in the form of "node-port"
+ * @return The backpressure znode path
+ */
+ public static String backpressurePath(String stormId, String shortPath) {
+ return backpressureStormRoot(stormId) + ZK_SEPERATOR + shortPath;
+ }
+
public static String errorStormRoot(String stormId) {
return ERRORS_SUBTREE + ZK_SEPERATOR + stormId;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 704c9e5..3ece640 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -99,7 +99,7 @@ public interface IStormClusterState {
public void supervisorHeartbeat(String supervisorId, SupervisorInfo info);
- public void workerBackpressure(String stormId, String node, Long port, boolean on);
+ public void workerBackpressure(String stormId, String node, Long port, long timestamp);
public boolean topologyBackpressure(String stormId, Runnable callback);
http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 343b0e6..19ee169 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -32,6 +32,7 @@ import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -423,24 +424,26 @@ public class StormClusterStateImpl implements IStormClusterState {
}
/**
- * if znode exists and to be not on?, delete; if exists and on?, do nothing; if not exists and to be on?, create; if not exists and not on?, do nothing;
+ * if znode exists and timestamp is 0?, delete; if exists and timestamp is larger than 0?, do nothing;
+ * if not exists and timestamp is larger than 0?, create the node and set the timestamp; if not exists and timestamp is 0?, do nothing;
*
* @param stormId
* @param node
* @param port
- * @param on
+ * @param timestamp
*/
@Override
- public void workerBackpressure(String stormId, String node, Long port, boolean on) {
+ public void workerBackpressure(String stormId, String node, Long port, long timestamp) {
String path = ClusterUtils.backpressurePath(stormId, node, port);
boolean existed = stateStorage.node_exists(path, false);
if (existed) {
- if (on == false)
+ if (timestamp == 0) {
stateStorage.delete_node(path);
-
+ }
} else {
- if (on == true) {
- stateStorage.set_ephemeral_node(path, null, acls);
+ if (timestamp > 0) {
+ byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
+ stateStorage.set_ephemeral_node(path, data, acls);
}
}
}
@@ -448,7 +451,8 @@ public class StormClusterStateImpl implements IStormClusterState {
/**
* Check whether a topology is in throttle-on status or not:
* if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.
- *
+ * But if the backpresure/storm-id dir is not empty and has not been updated for more than 30s, we treat it as throttle-off.
+ * This will prevent the spouts from getting stuck indefinitely if something wrong happens.
* @param stormId
* @param callback
* @return
@@ -459,14 +463,15 @@ public class StormClusterStateImpl implements IStormClusterState {
backPressureCallback.put(stormId, callback);
}
String path = ClusterUtils.backpressureStormRoot(stormId);
- List<String> childrens = null;
+ long mostRecentTimestamp = 0;
if(stateStorage.node_exists(path, false)) {
- childrens = stateStorage.get_children(path, callback != null);
- } else {
- childrens = new ArrayList<>();
+ List<String> children = stateStorage.get_children(path, callback != null);
+ mostRecentTimestamp = children.stream().map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
+ .filter(data -> data != null).mapToLong(data -> ByteBuffer.wrap(data).getLong()).max().orElse(0);
}
- return childrens.size() > 0;
-
+ boolean ret = ((System.currentTimeMillis() - mostRecentTimestamp) < 30000);
+ LOG.debug("topology backpressure is {}", ret ? "on" : "off");
+ return ret;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 9e7bd0b..d878fc8 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -406,25 +406,36 @@ public class Worker implements Shutdownable, DaemonCommon {
final List<IRunningExecutor> executors = executorsAtom.get();
return new WorkerBackpressureCallback() {
@Override public void onEvent(Object obj) {
- String topologyId = workerState.topologyId;
- String assignmentId = workerState.assignmentId;
- int port = workerState.port;
- IStormClusterState stormClusterState = workerState.stormClusterState;
- boolean prevBackpressureFlag = workerState.backpressure.get();
- boolean currBackpressureFlag = prevBackpressureFlag;
if (null != executors) {
- currBackpressureFlag = workerState.transferQueue.getThrottleOn() || (executors.stream()
- .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get());
- }
+ String topologyId = workerState.topologyId;
+ String assignmentId = workerState.assignmentId;
+ int port = workerState.port;
+ IStormClusterState stormClusterState = workerState.stormClusterState;
+ long prevBackpressureTimestamp = workerState.backpressure.get();
+ long currTimestamp = System.currentTimeMillis();
+ long currBackpressureTimestamp = 0;
+ // the backpressure flag is true if at least one of the disruptor queues has throttle-on
+ boolean backpressureFlag = workerState.transferQueue.getThrottleOn() || (executors.stream()
+ .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get());
+
+ if (backpressureFlag) {
+ // update the backpressure timestamp every 15 seconds
+ if ((currTimestamp - prevBackpressureTimestamp) > 15000) {
+ currBackpressureTimestamp = currTimestamp;
+ } else {
+ currBackpressureTimestamp = prevBackpressureTimestamp;
+ }
+ }
- if (currBackpressureFlag != prevBackpressureFlag) {
- try {
- LOG.debug("worker backpressure flag changing from {} to {}", prevBackpressureFlag, currBackpressureFlag);
- stormClusterState.workerBackpressure(topologyId, assignmentId, (long) port, currBackpressureFlag);
- // doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception
- workerState.backpressure.set(currBackpressureFlag);
- } catch (Exception ex) {
- LOG.error("workerBackpressure update failed when connecting to ZK ... will retry", ex);
+ if (currBackpressureTimestamp != prevBackpressureTimestamp) {
+ try {
+ LOG.debug("worker backpressure timestamp changing from {} to {}", prevBackpressureTimestamp, currBackpressureTimestamp);
+ stormClusterState.workerBackpressure(topologyId, assignmentId, (long) port, currBackpressureTimestamp);
+ // doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception
+ workerState.backpressure.set(currBackpressureTimestamp);
+ } catch (Exception ex) {
+ LOG.error("workerBackpressure update failed when connecting to ZK ... will retry", ex);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 33ea579..f2c09b1 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -211,8 +211,8 @@ public class WorkerState {
final Map<String, Object> userSharedResources;
final LoadMapping loadMapping;
final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions;
- // Whether this worker is going slow
- final AtomicBoolean backpressure = new AtomicBoolean(false);
+ // Whether this worker is going slow. 0 indicates the backpressure is off
+ final AtomicLong backpressure = new AtomicLong(0);
// If the transfer queue is backed-up
final AtomicBoolean transferBackpressure = new AtomicBoolean(false);
// a trigger for synchronization with executors
[2/4] storm git commit: [STORM-2744] Address review comments
Posted by bo...@apache.org.
[STORM-2744] Address review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/884eb61d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/884eb61d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/884eb61d
Branch: refs/heads/master
Commit: 884eb61ddab0c3da7ef587793b8543ba0f906451
Parents: bebc09c
Author: Ethan Li <et...@gmail.com>
Authored: Fri Sep 22 09:38:12 2017 -0500
Committer: Ethan Li <et...@gmail.com>
Committed: Fri Sep 22 09:38:46 2017 -0500
----------------------------------------------------------------------
conf/defaults.yaml | 2 +
.../src/jvm/org/apache/storm/Config.java | 17 ++++++++
.../storm/cluster/IStormClusterState.java | 2 +-
.../storm/cluster/StormClusterStateImpl.java | 41 ++++++++++++--------
.../org/apache/storm/daemon/worker/Worker.java | 17 ++++----
.../apache/storm/daemon/worker/WorkerState.java | 6 ++-
6 files changed, 60 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index c6ef390..16df356 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -190,6 +190,8 @@ task.backpressure.poll.secs: 30
topology.backpressure.enable: false
backpressure.disruptor.high.watermark: 0.9
backpressure.disruptor.low.watermark: 0.4
+backpressure.znode.timeout.secs: 30
+backpressure.znode.update.freq.secs: 15
zmq.threads: 1
zmq.linger.millis: 5000
http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 5623698..7149631 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -129,6 +129,23 @@ public class Config extends HashMap<String, Object> {
public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark";
/**
+ * How long until the backpressure znode is invalid.
+ * It's measured by the data (timestamp) of the znode, not the ctime (creation time) or mtime (modification time), etc.
+ * This must be larger than BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS.
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String BACKPRESSURE_ZNODE_TIMEOUT_SECS = "backpressure.znode.timeout.secs";
+
+ /**
+ * How often will the data (timestamp) of backpressure znode be updated.
+ * But if the worker backpressure status (on/off) changes, the znode will be updated anyway.
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS = "backpressure.znode.update.freq.secs";
+
+ /**
* A list of users that are allowed to interact with the topology. To use this set
* nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 3ece640..674865b 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -101,7 +101,7 @@ public interface IStormClusterState {
public void workerBackpressure(String stormId, String node, Long port, long timestamp);
- public boolean topologyBackpressure(String stormId, Runnable callback);
+ public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback);
public void setupBackpressure(String stormId);
http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 19ee169..4d7e2a5 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -424,13 +424,14 @@ public class StormClusterStateImpl implements IStormClusterState {
}
/**
- * if znode exists and timestamp is 0?, delete; if exists and timestamp is larger than 0?, do nothing;
- * if not exists and timestamp is larger than 0?, create the node and set the timestamp; if not exists and timestamp is 0?, do nothing;
- *
- * @param stormId
- * @param node
- * @param port
- * @param timestamp
+ * If znode exists and timestamp is 0, delete;
+ * if exists and timestamp is larger than 0, update the timestamp;
+ * if not exists and timestamp is larger than 0, create the znode and set the timestamp;
+ * if not exists and timestamp is 0, do nothing.
+ * @param stormId The topology Id
+ * @param node The node id
+ * @param port The port number
+ * @param timestamp The backpressure timestamp. 0 means turning off the worker backpressure
*/
@Override
public void workerBackpressure(String stormId, String node, Long port, long timestamp) {
@@ -439,6 +440,9 @@ public class StormClusterStateImpl implements IStormClusterState {
if (existed) {
if (timestamp == 0) {
stateStorage.delete_node(path);
+ } else {
+ byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
+ stateStorage.set_data(path, data, acls);
}
} else {
if (timestamp > 0) {
@@ -451,14 +455,15 @@ public class StormClusterStateImpl implements IStormClusterState {
/**
* Check whether a topology is in throttle-on status or not:
* if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.
- * But if the backpresure/storm-id dir is not empty and has not been updated for more than 30s, we treat it as throttle-off.
+ * But if the backpresure/storm-id dir is not empty and has not been updated for more than timeoutMs, we treat it as throttle-off.
* This will prevent the spouts from getting stuck indefinitely if something wrong happens.
- * @param stormId
- * @param callback
- * @return
+ * @param stormId The topology Id
+ * @param timeoutMs How long until the backpressure znode is invalid.
+ * @param callback The callback function
+ * @return True is backpresure/storm-id dir is not empty and at least one of the backpressure znodes has not timed out; false otherwise.
*/
@Override
- public boolean topologyBackpressure(String stormId, Runnable callback) {
+ public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback) {
if (callback != null) {
backPressureCallback.put(stormId, callback);
}
@@ -466,10 +471,14 @@ public class StormClusterStateImpl implements IStormClusterState {
long mostRecentTimestamp = 0;
if(stateStorage.node_exists(path, false)) {
List<String> children = stateStorage.get_children(path, callback != null);
- mostRecentTimestamp = children.stream().map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
- .filter(data -> data != null).mapToLong(data -> ByteBuffer.wrap(data).getLong()).max().orElse(0);
- }
- boolean ret = ((System.currentTimeMillis() - mostRecentTimestamp) < 30000);
+ mostRecentTimestamp = children.stream()
+ .map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
+ .filter(data -> data != null)
+ .mapToLong(data -> ByteBuffer.wrap(data).getLong())
+ .max()
+ .orElse(0);
+ }
+ boolean ret = ((System.currentTimeMillis() - mostRecentTimestamp) < timeoutMs);
LOG.debug("topology backpressure is {}", ret ? "on" : "off");
return ret;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index d878fc8..519e7ce 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -95,7 +95,8 @@ public class Worker implements Shutdownable, DaemonCommon {
private AtomicReference<List<IRunningExecutor>> executorsAtom;
private Thread transferThread;
private WorkerBackpressureThread backpressureThread;
-
+ // How long until the backpressure znode is invalid.
+ private long backpressureZnodeTimeoutMs;
private AtomicReference<Credentials> credentialsAtom;
private Subject subject;
private Collection<IAutoCredentials> autoCreds;
@@ -152,6 +153,7 @@ public class Worker implements Shutdownable, DaemonCommon {
}
autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
subject = AuthUtils.populateSubject(null, autoCreds, initCreds);
+ backpressureZnodeTimeoutMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_TIMEOUT_SECS)) * 1000;
Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
@Override public Object run() throws Exception {
@@ -224,11 +226,11 @@ public class Worker implements Shutdownable, DaemonCommon {
workerState.transferQueue
.setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
- WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
+ WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler(topologyConf);
backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
backpressureThread.start();
- stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
+ stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, workerState::refreshThrottle);
int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle);
@@ -366,7 +368,7 @@ public class Worker implements Shutdownable, DaemonCommon {
}
public void checkThrottleChanged() {
- boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, this::checkThrottleChanged);
+ boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, this::checkThrottleChanged);
workerState.throttleOn.set(throttleOn);
}
@@ -402,8 +404,9 @@ public class Worker implements Shutdownable, DaemonCommon {
/**
* make a handler that checks and updates worker's backpressure flag
*/
- private WorkerBackpressureCallback mkBackpressureHandler() {
+ private WorkerBackpressureCallback mkBackpressureHandler(Map<String, Object> topologyConf) {
final List<IRunningExecutor> executors = executorsAtom.get();
+ final long updateFreqMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS)) * 1000;
return new WorkerBackpressureCallback() {
@Override public void onEvent(Object obj) {
if (null != executors) {
@@ -419,8 +422,8 @@ public class Worker implements Shutdownable, DaemonCommon {
.map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get());
if (backpressureFlag) {
- // update the backpressure timestamp every 15 seconds
- if ((currTimestamp - prevBackpressureTimestamp) > 15000) {
+ // update the backpressure timestamp every updateFreqMs ms
+ if ((currTimestamp - prevBackpressureTimestamp) > updateFreqMs) {
currBackpressureTimestamp = currTimestamp;
} else {
currBackpressureTimestamp = prevBackpressureTimestamp;
http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index f2c09b1..d679ee8 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -213,6 +213,8 @@ public class WorkerState {
final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions;
// Whether this worker is going slow. 0 indicates the backpressure is off
final AtomicLong backpressure = new AtomicLong(0);
+ // How long until the backpressure znode is invalid.
+ final long backpressureZnodeTimeoutMs;
// If the transfer queue is backed-up
final AtomicBoolean transferBackpressure = new AtomicBoolean(false);
// a trigger for synchronization with executors
@@ -298,6 +300,7 @@ public class WorkerState {
}
Collections.sort(taskIds);
this.topologyConf = topologyConf;
+ this.backpressureZnodeTimeoutMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_TIMEOUT_SECS)) * 1000;
this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, AdvancedFSOps.make(conf));
this.systemTopology = StormCommon.systemTopology(topologyConf, topology);
this.taskToComponent = StormCommon.stormTaskInfo(topology, topologyConf);
@@ -333,6 +336,7 @@ public class WorkerState {
LOG.warn("WILL TRY TO SERIALIZE ALL TUPLES (Turn off {} for production", Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE);
}
this.drainer = new TransferDrainer();
+
}
public void refreshConnections() {
@@ -441,7 +445,7 @@ public class WorkerState {
}
public void refreshThrottle() {
- boolean backpressure = stormClusterState.topologyBackpressure(topologyId, this::refreshThrottle);
+ boolean backpressure = stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, this::refreshThrottle);
this.throttleOn.set(backpressure);
}
[3/4] storm git commit: [STORM-2744] Address review comments
Posted by bo...@apache.org.
[STORM-2744] Address review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/de0118d3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/de0118d3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/de0118d3
Branch: refs/heads/master
Commit: de0118d3c80d5bcf67aa20fb4b15743bc7fc2a5b
Parents: 884eb61
Author: Ethan Li <et...@gmail.com>
Authored: Fri Sep 22 10:14:49 2017 -0500
Committer: Ethan Li <et...@gmail.com>
Committed: Fri Sep 22 10:14:49 2017 -0500
----------------------------------------------------------------------
.../org/apache/storm/cluster/StormClusterStateImpl.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/de0118d3/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 4d7e2a5..7a52d86 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -424,21 +424,21 @@ public class StormClusterStateImpl implements IStormClusterState {
}
/**
- * If znode exists and timestamp is 0, delete;
+ * If znode exists and timestamp is non-positive, delete;
* if exists and timestamp is larger than 0, update the timestamp;
* if not exists and timestamp is larger than 0, create the znode and set the timestamp;
- * if not exists and timestamp is 0, do nothing.
+ * if not exists and timestamp is non-positive, do nothing.
* @param stormId The topology Id
* @param node The node id
* @param port The port number
- * @param timestamp The backpressure timestamp. 0 means turning off the worker backpressure
+ * @param timestamp The backpressure timestamp. Non-positive means turning off the worker backpressure
*/
@Override
public void workerBackpressure(String stormId, String node, Long port, long timestamp) {
String path = ClusterUtils.backpressurePath(stormId, node, port);
boolean existed = stateStorage.node_exists(path, false);
if (existed) {
- if (timestamp == 0) {
+ if (timestamp <= 0) {
stateStorage.delete_node(path);
} else {
byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
@@ -478,7 +478,7 @@ public class StormClusterStateImpl implements IStormClusterState {
.max()
.orElse(0);
}
- boolean ret = ((System.currentTimeMillis() - mostRecentTimestamp) < timeoutMs);
+ boolean ret = ((System.currentTimeMillis() - mostRecentTimestamp) < timeoutMs);
LOG.debug("topology backpressure is {}", ret ? "on" : "off");
return ret;
}
[4/4] storm git commit: Merge branch 'STORM-2744' of
https://github.com/Ethanlm/storm into STORM-2744
Posted by bo...@apache.org.
Merge branch 'STORM-2744' of https://github.com/Ethanlm/storm into STORM-2744
STORM-2744: add in restart timeout for backpressure
This closes #2338
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/db510ae5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/db510ae5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/db510ae5
Branch: refs/heads/master
Commit: db510ae588f6cf2547c8c4c4b220b53f9db912d0
Parents: d2e221a de0118d
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Sep 22 12:10:07 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Sep 22 12:10:07 2017 -0500
----------------------------------------------------------------------
conf/defaults.yaml | 2 +
.../src/jvm/org/apache/storm/Config.java | 17 ++++++
.../org/apache/storm/cluster/ClusterUtils.java | 10 ++++
.../storm/cluster/IStormClusterState.java | 4 +-
.../storm/cluster/StormClusterStateImpl.java | 60 ++++++++++++--------
.../org/apache/storm/daemon/worker/Worker.java | 58 ++++++++++++-------
.../apache/storm/daemon/worker/WorkerState.java | 10 +++-
7 files changed, 111 insertions(+), 50 deletions(-)
----------------------------------------------------------------------