You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/01 14:36:04 UTC
[1/3] storm git commit: Adding backpressure timeout,
backpressure znodes cleanup,
Do not delete backpressure ephemeral node frequently
Repository: storm
Updated Branches:
refs/heads/1.x-branch 49c2fc39f -> 33f543cf6
Adding backpressure timeout, backpressure znodes cleanup, Do not delete backpressure ephemeral node frequently
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dd04a556
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dd04a556
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dd04a556
Branch: refs/heads/1.x-branch
Commit: dd04a5563317fa6f57d3d7ec32190940b98454d7
Parents: 58ae04b
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Mon Jan 22 15:47:42 2018 -0500
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Mon Jan 22 15:47:52 2018 -0500
----------------------------------------------------------------------
conf/defaults.yaml | 2 +
storm-core/src/clj/org/apache/storm/cluster.clj | 63 ++++++++++++++----
.../src/clj/org/apache/storm/daemon/worker.clj | 68 ++++++++++++--------
storm-core/src/jvm/org/apache/storm/Config.java | 17 +++++
.../test/clj/org/apache/storm/cluster_test.clj | 15 +++++
5 files changed, 123 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dd04a556/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index f89211b..2bd7855 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -194,6 +194,8 @@ task.backpressure.poll.secs: 30
topology.backpressure.enable: false
backpressure.disruptor.high.watermark: 0.9
backpressure.disruptor.low.watermark: 0.4
+backpressure.znode.timeout.secs: 30
+backpressure.znode.update.freq.secs: 15
zmq.threads: 1
zmq.linger.millis: 5000
http://git-wip-us.apache.org/repos/asf/storm/blob/dd04a556/storm-core/src/clj/org/apache/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj b/storm-core/src/clj/org/apache/storm/cluster.clj
index 810b3c3..eafa40b 100644
--- a/storm-core/src/clj/org/apache/storm/cluster.clj
+++ b/storm-core/src/clj/org/apache/storm/cluster.clj
@@ -18,7 +18,8 @@
(:import [org.apache.zookeeper.data Stat ACL Id]
[org.apache.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary
LogConfig ProfileAction ProfileRequest NodeInfo]
- [java.io Serializable])
+ [java.io Serializable]
+ [java.nio ByteBuffer])
(:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
(:import [org.apache.curator.framework CuratorFramework])
(:import [org.apache.storm.utils Utils])
@@ -80,7 +81,7 @@
(remove-worker-heartbeat! [this storm-id node port])
(supervisor-heartbeat! [this supervisor-id info])
(worker-backpressure! [this storm-id node port info])
- (topology-backpressure [this storm-id callback])
+ (topology-backpressure [this storm-id timeout-ms callback])
(setup-backpressure! [this storm-id])
(remove-backpressure! [this storm-id])
(remove-worker-backpressure! [this storm-id node port])
@@ -172,6 +173,10 @@
[storm-id node port]
(str (backpressure-storm-root storm-id) "/" node "-" port))
+(defn backpressure-full-path
+ [storm-id short-path]
+ (str (backpressure-storm-root storm-id) "/" short-path))
+
(defn error-storm-root
[storm-id]
(str ERRORS-SUBTREE "/" storm-id))
@@ -242,6 +247,20 @@
:stats (get executor-stats t)}})))
(into {}))))
+
+(defn max-timestamp
+ "Reduces the timestamps (e.g. those set by worker-backpressure!)
+ to the most recent timestamp"
+ [cluster-state storm-id paths]
+ (reduce (fn [acc path]
+ (let [data (.get_data cluster-state (backpressure-full-path storm-id path) false)
+ timestamp (if data
+ (.. (ByteBuffer/wrap data) (getLong))
+ 0)]
+ (Math/max acc timestamp)))
+ 0
+ paths))
+
;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called.
(defnk mk-storm-cluster-state
[cluster-state-spec :acls nil :context (ClusterStateContext.)]
@@ -483,27 +502,37 @@
(log-warn-error e "Could not teardown heartbeats for " storm-id))))
(worker-backpressure!
- [this storm-id node port on?]
- "if znode exists and to be not on?, delete; if exists and on?, do nothing;
- if not exists and to be on?, create; if not exists and not on?, do nothing"
+ [this storm-id node port timestamp]
+ "If znode exists and timestamp is non-positive, ignore;
+ if exists and timestamp is larger than 0, update the timestamp;
+ if not exists and timestamp is larger than 0, create the znode and set the timestamp;
+ if not exists and timestamp is non-positive, do nothing."
(let [path (backpressure-path storm-id node port)
existed (.node_exists cluster-state path false)]
(if existed
- (if (not on?)
- (.delete_node cluster-state path)) ;; delete the znode since the worker is not congested
- (if on?
- (.set_ephemeral_node cluster-state path nil acls))))) ;; create the znode since worker is congested
+ (if-not (<= timestamp 0)
+ (let [bytes (.. (ByteBuffer/allocate (Long/BYTES)) (putLong timestamp) (array))]
+ (.set_data cluster-state path bytes acls)))
+ (when timestamp
+ (let [bytes (.. (ByteBuffer/allocate (Long/BYTES)) (putLong timestamp) (array))]
+ (.set_ephemeral_node cluster-state path bytes acls)))))) ;; create the znode since worker is congested
(topology-backpressure
- [this storm-id callback]
+ [this storm-id timeout-ms callback]
"if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.
+ But if the backpresure/storm-id dir is not empty and has not been updated for more than timeoutMs, we treat it as throttle-off.
+ This will prevent the spouts from getting stuck indefinitely if something wrong happens.
The backpressure/storm-id dir may not exist if nimbus has shutdown the topology"
(when callback
(swap! backpressure-callback assoc storm-id callback))
(let [path (backpressure-storm-root storm-id)
children (if (.node_exists cluster-state path false)
- (.get_children cluster-state path (not-nil? callback))) ]
- (> (count children) 0)))
+ (.get_children cluster-state path (not-nil? callback)))
+ most-recent-backpressure (max-timestamp cluster-state storm-id children)
+ current-time (System/currentTimeMillis)
+ ret (> timeout-ms (- current-time most-recent-backpressure))]
+ (log-debug "topology backpressure is " (if ret "on" "off"))
+ ret))
(setup-backpressure!
[this storm-id]
@@ -511,14 +540,20 @@
(remove-backpressure!
[this storm-id]
- (.delete_node cluster-state (backpressure-storm-root storm-id)))
+ (try-cause
+ (.delete_node cluster-state (backpressure-storm-root storm-id))
+ (catch KeeperException e
+ (log-warn-error e "Could not teardown backpressure for " storm-id))))
(remove-worker-backpressure!
[this storm-id node port]
(let [path (backpressure-path storm-id node port)
existed (.node_exists cluster-state path false)]
(if existed
- (.delete_node cluster-state (backpressure-path storm-id node port)))))
+ (try-cause
+ (.delete_node cluster-state (backpressure-path storm-id node port))
+ (catch KeeperException e
+ (log-warn-error e "Could not teardown backpressure for " storm-id))))))
(teardown-topology-errors!
[this storm-id]
http://git-wip-us.apache.org/repos/asf/storm/blob/dd04a556/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 6626272..633a61d 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -131,30 +131,39 @@
(let [tuple (.getTuple addressed-tuple)]
(.serialize serializer tuple))))
-(defn- mk-backpressure-handler [executors]
- "make a handler that checks and updates worker's backpressure flag"
- (disruptor/worker-backpressure-handler
- (fn [worker]
- (let [storm-id (:storm-id worker)
- assignment-id (:assignment-id worker)
- port (:port worker)
- storm-cluster-state (:storm-cluster-state worker)
- prev-backpressure-flag @(:backpressure worker)
- ;; the backpressure flag is true if at least one of the disruptor queues has throttle-on
- curr-backpressure-flag (if executors
- (or (.getThrottleOn (:transfer-queue worker))
- (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors)))
- prev-backpressure-flag)]
- ;; update the worker's backpressure flag to zookeeper only when it has changed
- (when (not= prev-backpressure-flag curr-backpressure-flag)
- (try
- (log-debug "worker backpressure flag changing from " prev-backpressure-flag " to " curr-backpressure-flag)
- (.worker-backpressure! storm-cluster-state storm-id assignment-id port curr-backpressure-flag)
- ;; doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception
- (reset! (:backpressure worker) curr-backpressure-flag)
- (catch Exception exc
- (log-error exc "workerBackpressure update failed when connecting to ZK ... will retry"))))
- ))))
+(defn should-trigger-backpressure [executors worker]
+ (or (.getThrottleOn (:transfer-queue worker))
+ (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))))
+
+(defn- mk-backpressure-handler [executors topo-conf]
+ "make a handler that checks and updates worker's backpressure timestamp"
+ (let [update-freq-ms (* (topo-conf BACKPRESSURE-ZNODE-UPDATE-FREQ-SECS) 1000)]
+ (disruptor/worker-backpressure-handler
+ (if executors
+ (fn [worker]
+ (let [storm-id (:storm-id worker)
+ assignment-id (:assignment-id worker)
+ port (:port worker)
+ storm-cluster-state (:storm-cluster-state worker)
+ prev-backpressure-timestamp @(:backpressure worker)
+ curr-timestamp (System/currentTimeMillis)
+ ;; the backpressure flag is true if at least one of the disruptor queues has throttle-on
+ curr-backpressure-timestamp (if (should-trigger-backpressure executors worker)
+ ;; Update the backpressure timestamp every update-freq-ms seconds
+ (if (> (- curr-timestamp (or prev-backpressure-timestamp 0)) update-freq-ms)
+ curr-timestamp
+ prev-backpressure-timestamp)
+ 0)]
+ ;; update the worker's backpressure timestamp to zookeeper only when it has changed
+ (when (not= prev-backpressure-timestamp curr-backpressure-timestamp)
+ (try
+ (log-debug "worker backpressure timestamp changing from " prev-backpressure-timestamp " to " curr-backpressure-timestamp)
+ (.worker-backpressure! storm-cluster-state storm-id assignment-id port curr-backpressure-timestamp)
+ ;; doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception
+ (reset! (:backpressure worker) curr-backpressure-timestamp)
+ (catch Exception exc
+ (log-error exc "workerBackpressure update failed when connecting to ZK ... will retry"))))))
+ (fn [workers])))))
(defn- mk-disruptor-backpressure-handler [worker]
"make a handler for the worker's send disruptor queue to
@@ -317,7 +326,7 @@
:transfer-fn (mk-transfer-fn <>)
:load-mapping (LoadMapping.)
:assignment-versions assignment-versions
- :backpressure (atom false) ;; whether this worker is going slow
+ :backpressure (atom 0) ;; whether this worker is going slow. non-positive means turning off backpressure
:backpressure-trigger (Object.) ;; a trigger for synchronization with executors
:throttle-on (atom false) ;; whether throttle is activated for spouts
)))
@@ -647,15 +656,18 @@
_ (-> (.setHighWaterMark (:transfer-queue worker) ((:storm-conf worker) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
(.setLowWaterMark ((:storm-conf worker) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
(.setEnableBackpressure ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)))
- backpressure-handler (mk-backpressure-handler @executors)
+ backpressure-handler (mk-backpressure-handler @executors storm-conf)
backpressure-thread (WorkerBackpressureThread. (:backpressure-trigger worker) worker backpressure-handler)
_ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
(.start backpressure-thread))
+ ;; this callback is registered as a zk watch on topology's backpressure directory
+ ;; which makes sure that the topology's backpressure status is updated to the worker's throttle-on
+ backpressure-znode-timeout-ms (* (storm-conf BACKPRESSURE-ZNODE-TIMEOUT-SECS) 1000)
topology-backpressure-callback (fn cb [& ignored]
- (let [throttle-on (.topology-backpressure storm-cluster-state storm-id cb)]
+ (let [throttle-on (.topology-backpressure storm-cluster-state storm-id backpressure-znode-timeout-ms cb)]
(reset! (:throttle-on worker) throttle-on)))
_ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
- (.topology-backpressure storm-cluster-state storm-id topology-backpressure-callback))
+ (.topology-backpressure storm-cluster-state storm-id backpressure-znode-timeout-ms topology-backpressure-callback))
shutdown* (fn []
(log-message "Shutting down worker " storm-id " " assignment-id " " port)
http://git-wip-us.apache.org/repos/asf/storm/blob/dd04a556/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 6b0c868..11f980e 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1561,6 +1561,23 @@ public class Config extends HashMap<String, Object> {
public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark";
/**
+ * How long until the backpressure znode is invalid.
+ * It's measured by the data (timestamp) of the znode, not the ctime (creation time) or mtime (modification time), etc.
+ * This must be larger than BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS.
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String BACKPRESSURE_ZNODE_TIMEOUT_SECS = "backpressure.znode.timeout.secs";
+
+ /**
+ * How often will the data (timestamp) of backpressure znode be updated.
+ * But if the worker backpressure status (on/off) changes, the znode will be updated anyway.
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS = "backpressure.znode.update.freq.secs";
+
+ /**
* A list of classes implementing IClusterMetricsConsumer (See storm.yaml.example for exact config format).
* Each listed class will be routed cluster related metrics data.
* Each listed class maps 1:1 to a ClusterMetricsConsumerExecutor and they're executed in Nimbus.
http://git-wip-us.apache.org/repos/asf/storm/blob/dd04a556/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index 55b686e..cb5f064 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -319,3 +319,18 @@
(mk-storm-cluster-state {})
(verify-call-times-for mk-distributed-cluster-state 1)
(verify-first-call-args-for-indices mk-distributed-cluster-state [4] nil))))
+
+(deftest test-cluster-state-backpressure
+ (testing "Test that we can get topology backpressure."
+ (stubbing [zk/mkdirs nil
+ zk/mk-client (reify CuratorFramework (^void close [this] nil))
+ mk-distributed-cluster-state (reify ClusterState
+ (get_data [this path watch?] (byte-array 10))
+ (register [this callback] nil)
+ (mkdirs [this path acls] nil)
+ (node_exists [this path watch?]
+ (log-message "Running node_exists.") true)
+ (get_children [this path watch?] '("/foo/bar")))]
+ (let [cluster-state (mk-storm-cluster-state {})]
+ (.get_data (mk-distributed-cluster-state) "/foo/bar" false)
+ (topology-backpressure cluster-state "" 30 nil)))))
[2/3] storm git commit: Use 1.7 compatible Long size
Posted by ka...@apache.org.
Use 1.7 compatible Long size
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/14cb3a94
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/14cb3a94
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/14cb3a94
Branch: refs/heads/1.x-branch
Commit: 14cb3a94a65136d016da25973d82e7177b2538ce
Parents: dd04a55
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Tue Jan 23 12:39:46 2018 -0500
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Tue Jan 23 12:50:53 2018 -0500
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/cluster.clj | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/14cb3a94/storm-core/src/clj/org/apache/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj b/storm-core/src/clj/org/apache/storm/cluster.clj
index eafa40b..731a0b9 100644
--- a/storm-core/src/clj/org/apache/storm/cluster.clj
+++ b/storm-core/src/clj/org/apache/storm/cluster.clj
@@ -511,10 +511,10 @@
existed (.node_exists cluster-state path false)]
(if existed
(if-not (<= timestamp 0)
- (let [bytes (.. (ByteBuffer/allocate (Long/BYTES)) (putLong timestamp) (array))]
+ (let [bytes (.. (ByteBuffer/allocate (/ (Long/SIZE) 8)) (putLong timestamp) (array))]
(.set_data cluster-state path bytes acls)))
(when timestamp
- (let [bytes (.. (ByteBuffer/allocate (Long/BYTES)) (putLong timestamp) (array))]
+ (let [bytes (.. (ByteBuffer/allocate (/ (Long/SIZE) 8)) (putLong timestamp) (array))]
(.set_ephemeral_node cluster-state path bytes acls)))))) ;; create the znode since worker is congested
(topology-backpressure
[3/3] storm git commit: Merge branch 'storm2873-1.x-branch' of
https://github.com/kishorvpatil/incubator-storm into STORM-2873-1.x-merge
Posted by ka...@apache.org.
Merge branch 'storm2873-1.x-branch' of https://github.com/kishorvpatil/incubator-storm into STORM-2873-1.x-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/33f543cf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/33f543cf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/33f543cf
Branch: refs/heads/1.x-branch
Commit: 33f543cf6211f8109d968bb27fecd40ae1f3a9d9
Parents: 49c2fc3 14cb3a9
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Feb 1 23:35:54 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Feb 1 23:35:54 2018 +0900
----------------------------------------------------------------------
conf/defaults.yaml | 2 +
storm-core/src/clj/org/apache/storm/cluster.clj | 63 ++++++++++++++----
.../src/clj/org/apache/storm/daemon/worker.clj | 68 ++++++++++++--------
storm-core/src/jvm/org/apache/storm/Config.java | 17 +++++
.../test/clj/org/apache/storm/cluster_test.clj | 15 +++++
5 files changed, 123 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/33f543cf/conf/defaults.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/33f543cf/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/33f543cf/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------