You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/22 17:36:38 UTC
[2/4] storm git commit: [STORM-2744] Address review comments
[STORM-2744] Address review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/884eb61d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/884eb61d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/884eb61d
Branch: refs/heads/master
Commit: 884eb61ddab0c3da7ef587793b8543ba0f906451
Parents: bebc09c
Author: Ethan Li <et...@gmail.com>
Authored: Fri Sep 22 09:38:12 2017 -0500
Committer: Ethan Li <et...@gmail.com>
Committed: Fri Sep 22 09:38:46 2017 -0500
----------------------------------------------------------------------
conf/defaults.yaml | 2 +
.../src/jvm/org/apache/storm/Config.java | 17 ++++++++
.../storm/cluster/IStormClusterState.java | 2 +-
.../storm/cluster/StormClusterStateImpl.java | 41 ++++++++++++--------
.../org/apache/storm/daemon/worker/Worker.java | 17 ++++----
.../apache/storm/daemon/worker/WorkerState.java | 6 ++-
6 files changed, 60 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index c6ef390..16df356 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -190,6 +190,8 @@ task.backpressure.poll.secs: 30
topology.backpressure.enable: false
backpressure.disruptor.high.watermark: 0.9
backpressure.disruptor.low.watermark: 0.4
+backpressure.znode.timeout.secs: 30
+backpressure.znode.update.freq.secs: 15
zmq.threads: 1
zmq.linger.millis: 5000
http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 5623698..7149631 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -129,6 +129,23 @@ public class Config extends HashMap<String, Object> {
public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark";
/**
+ * How long until the backpressure znode is invalid.
+ * It's measured by the data (timestamp) of the znode, not the ctime (creation time) or mtime (modification time), etc.
+ * This must be larger than BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS.
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String BACKPRESSURE_ZNODE_TIMEOUT_SECS = "backpressure.znode.timeout.secs";
+
+ /**
+ * How often will the data (timestamp) of backpressure znode be updated.
+ * But if the worker backpressure status (on/off) changes, the znode will be updated anyway.
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS = "backpressure.znode.update.freq.secs";
+
+ /**
* A list of users that are allowed to interact with the topology. To use this set
* nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 3ece640..674865b 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -101,7 +101,7 @@ public interface IStormClusterState {
public void workerBackpressure(String stormId, String node, Long port, long timestamp);
- public boolean topologyBackpressure(String stormId, Runnable callback);
+ public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback);
public void setupBackpressure(String stormId);
http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 19ee169..4d7e2a5 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -424,13 +424,14 @@ public class StormClusterStateImpl implements IStormClusterState {
}
/**
- * if znode exists and timestamp is 0?, delete; if exists and timestamp is larger than 0?, do nothing;
- * if not exists and timestamp is larger than 0?, create the node and set the timestamp; if not exists and timestamp is 0?, do nothing;
- *
- * @param stormId
- * @param node
- * @param port
- * @param timestamp
+ * If znode exists and timestamp is 0, delete;
+ * if exists and timestamp is larger than 0, update the timestamp;
+ * if not exists and timestamp is larger than 0, create the znode and set the timestamp;
+ * if not exists and timestamp is 0, do nothing.
+ * @param stormId The topology Id
+ * @param node The node id
+ * @param port The port number
+ * @param timestamp The backpressure timestamp. 0 means turning off the worker backpressure
*/
@Override
public void workerBackpressure(String stormId, String node, Long port, long timestamp) {
@@ -439,6 +440,9 @@ public class StormClusterStateImpl implements IStormClusterState {
if (existed) {
if (timestamp == 0) {
stateStorage.delete_node(path);
+ } else {
+ byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
+ stateStorage.set_data(path, data, acls);
}
} else {
if (timestamp > 0) {
@@ -451,14 +455,15 @@ public class StormClusterStateImpl implements IStormClusterState {
/**
* Check whether a topology is in throttle-on status or not:
* if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.
- * But if the backpresure/storm-id dir is not empty and has not been updated for more than 30s, we treat it as throttle-off.
+ * But if the backpresure/storm-id dir is not empty and has not been updated for more than timeoutMs, we treat it as throttle-off.
* This will prevent the spouts from getting stuck indefinitely if something wrong happens.
- * @param stormId
- * @param callback
- * @return
+ * @param stormId The topology Id
+ * @param timeoutMs How long until the backpressure znode is invalid.
+ * @param callback The callback function
+ * @return True is backpresure/storm-id dir is not empty and at least one of the backpressure znodes has not timed out; false otherwise.
*/
@Override
- public boolean topologyBackpressure(String stormId, Runnable callback) {
+ public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback) {
if (callback != null) {
backPressureCallback.put(stormId, callback);
}
@@ -466,10 +471,14 @@ public class StormClusterStateImpl implements IStormClusterState {
long mostRecentTimestamp = 0;
if(stateStorage.node_exists(path, false)) {
List<String> children = stateStorage.get_children(path, callback != null);
- mostRecentTimestamp = children.stream().map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
- .filter(data -> data != null).mapToLong(data -> ByteBuffer.wrap(data).getLong()).max().orElse(0);
- }
- boolean ret = ((System.currentTimeMillis() - mostRecentTimestamp) < 30000);
+ mostRecentTimestamp = children.stream()
+ .map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
+ .filter(data -> data != null)
+ .mapToLong(data -> ByteBuffer.wrap(data).getLong())
+ .max()
+ .orElse(0);
+ }
+ boolean ret = ((System.currentTimeMillis() - mostRecentTimestamp) < timeoutMs);
LOG.debug("topology backpressure is {}", ret ? "on" : "off");
return ret;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index d878fc8..519e7ce 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -95,7 +95,8 @@ public class Worker implements Shutdownable, DaemonCommon {
private AtomicReference<List<IRunningExecutor>> executorsAtom;
private Thread transferThread;
private WorkerBackpressureThread backpressureThread;
-
+ // How long until the backpressure znode is invalid.
+ private long backpressureZnodeTimeoutMs;
private AtomicReference<Credentials> credentialsAtom;
private Subject subject;
private Collection<IAutoCredentials> autoCreds;
@@ -152,6 +153,7 @@ public class Worker implements Shutdownable, DaemonCommon {
}
autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
subject = AuthUtils.populateSubject(null, autoCreds, initCreds);
+ backpressureZnodeTimeoutMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_TIMEOUT_SECS)) * 1000;
Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
@Override public Object run() throws Exception {
@@ -224,11 +226,11 @@ public class Worker implements Shutdownable, DaemonCommon {
workerState.transferQueue
.setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
- WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
+ WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler(topologyConf);
backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
backpressureThread.start();
- stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
+ stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, workerState::refreshThrottle);
int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle);
@@ -366,7 +368,7 @@ public class Worker implements Shutdownable, DaemonCommon {
}
public void checkThrottleChanged() {
- boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, this::checkThrottleChanged);
+ boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, this::checkThrottleChanged);
workerState.throttleOn.set(throttleOn);
}
@@ -402,8 +404,9 @@ public class Worker implements Shutdownable, DaemonCommon {
/**
* make a handler that checks and updates worker's backpressure flag
*/
- private WorkerBackpressureCallback mkBackpressureHandler() {
+ private WorkerBackpressureCallback mkBackpressureHandler(Map<String, Object> topologyConf) {
final List<IRunningExecutor> executors = executorsAtom.get();
+ final long updateFreqMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS)) * 1000;
return new WorkerBackpressureCallback() {
@Override public void onEvent(Object obj) {
if (null != executors) {
@@ -419,8 +422,8 @@ public class Worker implements Shutdownable, DaemonCommon {
.map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get());
if (backpressureFlag) {
- // update the backpressure timestamp every 15 seconds
- if ((currTimestamp - prevBackpressureTimestamp) > 15000) {
+ // update the backpressure timestamp every updateFreqMs ms
+ if ((currTimestamp - prevBackpressureTimestamp) > updateFreqMs) {
currBackpressureTimestamp = currTimestamp;
} else {
currBackpressureTimestamp = prevBackpressureTimestamp;
http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index f2c09b1..d679ee8 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -213,6 +213,8 @@ public class WorkerState {
final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions;
// Whether this worker is going slow. 0 indicates the backpressure is off
final AtomicLong backpressure = new AtomicLong(0);
+ // How long until the backpressure znode is invalid.
+ final long backpressureZnodeTimeoutMs;
// If the transfer queue is backed-up
final AtomicBoolean transferBackpressure = new AtomicBoolean(false);
// a trigger for synchronization with executors
@@ -298,6 +300,7 @@ public class WorkerState {
}
Collections.sort(taskIds);
this.topologyConf = topologyConf;
+ this.backpressureZnodeTimeoutMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_TIMEOUT_SECS)) * 1000;
this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, AdvancedFSOps.make(conf));
this.systemTopology = StormCommon.systemTopology(topologyConf, topology);
this.taskToComponent = StormCommon.stormTaskInfo(topology, topologyConf);
@@ -333,6 +336,7 @@ public class WorkerState {
LOG.warn("WILL TRY TO SERIALIZE ALL TUPLES (Turn off {} for production", Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE);
}
this.drainer = new TransferDrainer();
+
}
public void refreshConnections() {
@@ -441,7 +445,7 @@ public class WorkerState {
}
public void refreshThrottle() {
- boolean backpressure = stormClusterState.topologyBackpressure(topologyId, this::refreshThrottle);
+ boolean backpressure = stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, this::refreshThrottle);
this.throttleOn.set(backpressure);
}