You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/02/26 00:13:59 UTC
[4/7] storm git commit: Ignore ZK Exception in onEvent
Ignore ZK Exception in onEvent
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/26453a36
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/26453a36
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/26453a36
Branch: refs/heads/master
Commit: 26453a36d60798d203721f308f16b87d25143485
Parents: a1ef75f
Author: zhuol <zh...@yahoo-inc.com>
Authored: Thu Feb 25 11:41:25 2016 -0600
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Feb 25 11:41:25 2016 -0600
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/worker.clj | 5 ++++-
.../apache/storm/utils/WorkerBackpressureThread.java | 15 +++++++--------
2 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/26453a36/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 5b22823..fd0c98e 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -155,7 +155,10 @@
;; update the worker's backpressure flag to zookeeper only when it has changed
(log-debug "BP " @(:backpressure worker) " WAS " prev-backpressure-flag)
(when (not= prev-backpressure-flag @(:backpressure worker))
- (.workerBackpressure storm-cluster-state storm-id assignment-id port @(:backpressure worker)))
+ (try
+ (.workerBackpressure storm-cluster-state storm-id assignment-id port @(:backpressure worker))
+ (catch Exception exc
+ (log-error exc "workerBackpressure update failed when connecting to ZK ... will retry"))))
))))
(defn- mk-disruptor-backpressure-handler [worker]
http://git-wip-us.apache.org/repos/asf/storm/blob/26453a36/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java b/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
index 6b25502..f3b5a66 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
@@ -48,8 +48,10 @@ public class WorkerBackpressureThread extends Thread {
}
}
- public void terminate() {
+ public void terminate() throws InterruptedException {
running = false;
+ interrupt();
+ join();
}
public void run() {
@@ -60,7 +62,7 @@ public class WorkerBackpressureThread extends Thread {
}
callback.onEvent(workerData); // check all executors and update zk backpressure throttle for the worker if needed
} catch (InterruptedException interEx) {
- LOG.info("WorkerBackpressureThread gets interrupted! Ignoring Exception: ", interEx);
+ // ignored, we are shutting down.
}
}
}
@@ -70,11 +72,8 @@ class BackpressureUncaughtExceptionHandler implements Thread.UncaughtExceptionHa
private static final Logger LOG = LoggerFactory.getLogger(BackpressureUncaughtExceptionHandler.class);
@Override
public void uncaughtException(Thread t, Throwable e) {
- try {
- Utils.handleUncaughtException(e);
- } catch (Error error) {
- LOG.info("Received error in WorkerBackpressureThread.. terminating the worker...");
- Runtime.getRuntime().exit(1);
- }
+ // note that exception that happens during connecting to ZK has been ignored in the callback implementation
+ LOG.error("Received error or exception in WorkerBackpressureThread.. terminating the worker...", e);
+ Runtime.getRuntime().exit(1);
}
}