You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/22 17:36:38 UTC

[2/4] storm git commit: [STORM-2744] Address review comments

[STORM-2744] Address review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/884eb61d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/884eb61d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/884eb61d

Branch: refs/heads/master
Commit: 884eb61ddab0c3da7ef587793b8543ba0f906451
Parents: bebc09c
Author: Ethan Li <et...@gmail.com>
Authored: Fri Sep 22 09:38:12 2017 -0500
Committer: Ethan Li <et...@gmail.com>
Committed: Fri Sep 22 09:38:46 2017 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |  2 +
 .../src/jvm/org/apache/storm/Config.java        | 17 ++++++++
 .../storm/cluster/IStormClusterState.java       |  2 +-
 .../storm/cluster/StormClusterStateImpl.java    | 41 ++++++++++++--------
 .../org/apache/storm/daemon/worker/Worker.java  | 17 ++++----
 .../apache/storm/daemon/worker/WorkerState.java |  6 ++-
 6 files changed, 60 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index c6ef390..16df356 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -190,6 +190,8 @@ task.backpressure.poll.secs: 30
 topology.backpressure.enable: false
 backpressure.disruptor.high.watermark: 0.9
 backpressure.disruptor.low.watermark: 0.4
+backpressure.znode.timeout.secs: 30
+backpressure.znode.update.freq.secs: 15
 
 zmq.threads: 1
 zmq.linger.millis: 5000

http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 5623698..7149631 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -129,6 +129,23 @@ public class Config extends HashMap<String, Object> {
     public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark";
 
     /**
+     * How long until the backpressure znode is invalid.
+     * It's measured by the data (timestamp) of the znode, not the ctime (creation time) or mtime (modification time), etc.
+     * This must be larger than BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String BACKPRESSURE_ZNODE_TIMEOUT_SECS = "backpressure.znode.timeout.secs";
+
+    /**
+     * How often will the data (timestamp) of backpressure znode be updated.
+     * But if the worker backpressure status (on/off) changes, the znode will be updated anyway.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS = "backpressure.znode.update.freq.secs";
+
+    /**
      * A list of users that are allowed to interact with the topology.  To use this set
      * nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 3ece640..674865b 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -101,7 +101,7 @@ public interface IStormClusterState {
 
     public void workerBackpressure(String stormId, String node, Long port, long timestamp);
 
-    public boolean topologyBackpressure(String stormId, Runnable callback);
+    public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback);
 
     public void setupBackpressure(String stormId);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 19ee169..4d7e2a5 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -424,13 +424,14 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     /**
-     * if znode exists and timestamp is 0?, delete; if exists and timestamp is larger than 0?, do nothing;
-     * if not exists and timestamp is larger than 0?, create the node and set the timestamp; if not exists and timestamp is 0?, do nothing;
-     * 
-     * @param stormId
-     * @param node
-     * @param port
-     * @param timestamp
+     * If znode exists and timestamp is 0, delete;
+     * if exists and timestamp is larger than 0, update the timestamp;
+     * if not exists and timestamp is larger than 0, create the znode and set the timestamp;
+     * if not exists and timestamp is 0, do nothing.
+     * @param stormId The topology Id
+     * @param node The node id
+     * @param port The port number
+     * @param timestamp The backpressure timestamp. 0 means turning off the worker backpressure
      */
     @Override
     public void workerBackpressure(String stormId, String node, Long port, long timestamp) {
@@ -439,6 +440,9 @@ public class StormClusterStateImpl implements IStormClusterState {
         if (existed) {
             if (timestamp == 0) {
                 stateStorage.delete_node(path);
+            } else {
+                byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
+                stateStorage.set_data(path, data, acls);
             }
         } else {
             if (timestamp > 0) {
@@ -451,14 +455,15 @@ public class StormClusterStateImpl implements IStormClusterState {
     /**
      * Check whether a topology is in throttle-on status or not:
      * if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.
-     * But if the backpresure/storm-id dir is not empty and has not been updated for more than 30s, we treat it as throttle-off.
+     * But if the backpresure/storm-id dir is not empty and has not been updated for more than timeoutMs, we treat it as throttle-off.
      * This will prevent the spouts from getting stuck indefinitely if something wrong happens.
-     * @param stormId
-     * @param callback
-     * @return
+     * @param stormId The topology Id
+     * @param timeoutMs How long until the backpressure znode is invalid.
+     * @param callback The callback function
+     * @return True is backpresure/storm-id dir is not empty and at least one of the backpressure znodes has not timed out; false otherwise.
      */
     @Override
-    public boolean topologyBackpressure(String stormId, Runnable callback) {
+    public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback) {
         if (callback != null) {
             backPressureCallback.put(stormId, callback);
         }
@@ -466,10 +471,14 @@ public class StormClusterStateImpl implements IStormClusterState {
         long mostRecentTimestamp = 0;
         if(stateStorage.node_exists(path, false)) {
             List<String> children = stateStorage.get_children(path, callback != null);
-            mostRecentTimestamp = children.stream().map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
-                    .filter(data -> data != null).mapToLong(data -> ByteBuffer.wrap(data).getLong()).max().orElse(0);
-        }
-        boolean ret =  ((System.currentTimeMillis() - mostRecentTimestamp) < 30000);
+            mostRecentTimestamp = children.stream()
+                    .map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
+                    .filter(data -> data != null)
+                    .mapToLong(data -> ByteBuffer.wrap(data).getLong())
+                    .max()
+                    .orElse(0);
+        }
+        boolean ret =  ((System.currentTimeMillis() - mostRecentTimestamp) < timeoutMs);
         LOG.debug("topology backpressure is {}", ret ? "on" : "off");
         return ret;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index d878fc8..519e7ce 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -95,7 +95,8 @@ public class Worker implements Shutdownable, DaemonCommon {
     private AtomicReference<List<IRunningExecutor>> executorsAtom;
     private Thread transferThread;
     private WorkerBackpressureThread backpressureThread;
-
+    // How long until the backpressure znode is invalid.
+    private long backpressureZnodeTimeoutMs;
     private AtomicReference<Credentials> credentialsAtom;
     private Subject subject;
     private Collection<IAutoCredentials> autoCreds;
@@ -152,6 +153,7 @@ public class Worker implements Shutdownable, DaemonCommon {
         }
         autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
         subject = AuthUtils.populateSubject(null, autoCreds, initCreds);
+        backpressureZnodeTimeoutMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_TIMEOUT_SECS)) * 1000;
 
         Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
             @Override public Object run() throws Exception {
@@ -224,11 +226,11 @@ public class Worker implements Shutdownable, DaemonCommon {
                 workerState.transferQueue
                     .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
 
-                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
+                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler(topologyConf);
                 backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
                 if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
                     backpressureThread.start();
-                    stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
+                    stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, workerState::refreshThrottle);
                     
                     int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
                     workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle);
@@ -366,7 +368,7 @@ public class Worker implements Shutdownable, DaemonCommon {
     }
 
     public void checkThrottleChanged() {
-        boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, this::checkThrottleChanged);
+        boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, this::checkThrottleChanged);
         workerState.throttleOn.set(throttleOn);
     }
 
@@ -402,8 +404,9 @@ public class Worker implements Shutdownable, DaemonCommon {
     /**
      * make a handler that checks and updates worker's backpressure flag
      */
-    private WorkerBackpressureCallback mkBackpressureHandler() {
+    private WorkerBackpressureCallback mkBackpressureHandler(Map<String, Object> topologyConf) {
         final List<IRunningExecutor> executors = executorsAtom.get();
+        final long updateFreqMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS)) * 1000;
         return new WorkerBackpressureCallback() {
             @Override public void onEvent(Object obj) {
                 if (null != executors) {
@@ -419,8 +422,8 @@ public class Worker implements Shutdownable, DaemonCommon {
                             .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get());
 
                     if (backpressureFlag) {
-                        // update the backpressure timestamp every 15 seconds
-                        if ((currTimestamp - prevBackpressureTimestamp) > 15000) {
+                        // update the backpressure timestamp every updateFreqMs ms
+                        if ((currTimestamp - prevBackpressureTimestamp) > updateFreqMs) {
                             currBackpressureTimestamp = currTimestamp;
                         } else {
                             currBackpressureTimestamp = prevBackpressureTimestamp;

http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index f2c09b1..d679ee8 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -213,6 +213,8 @@ public class WorkerState {
     final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions;
     // Whether this worker is going slow. 0 indicates the backpressure is off
     final AtomicLong backpressure = new AtomicLong(0);
+    // How long until the backpressure znode is invalid.
+    final long backpressureZnodeTimeoutMs;
     // If the transfer queue is backed-up
     final AtomicBoolean transferBackpressure = new AtomicBoolean(false);
     // a trigger for synchronization with executors
@@ -298,6 +300,7 @@ public class WorkerState {
         }
         Collections.sort(taskIds);
         this.topologyConf = topologyConf;
+        this.backpressureZnodeTimeoutMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_TIMEOUT_SECS)) * 1000;
         this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, AdvancedFSOps.make(conf));
         this.systemTopology = StormCommon.systemTopology(topologyConf, topology);
         this.taskToComponent = StormCommon.stormTaskInfo(topology, topologyConf);
@@ -333,6 +336,7 @@ public class WorkerState {
             LOG.warn("WILL TRY TO SERIALIZE ALL TUPLES (Turn off {} for production", Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE);
         }
         this.drainer = new TransferDrainer();
+
     }
 
     public void refreshConnections() {
@@ -441,7 +445,7 @@ public class WorkerState {
     }
 
     public void refreshThrottle() {
-        boolean backpressure = stormClusterState.topologyBackpressure(topologyId, this::refreshThrottle);
+        boolean backpressure = stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, this::refreshThrottle);
         this.throttleOn.set(backpressure);
     }