You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/02/26 00:13:59 UTC

[4/7] storm git commit: Ignore ZK Exception in onEvent

Ignore ZK Exception in onEvent


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/26453a36
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/26453a36
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/26453a36

Branch: refs/heads/master
Commit: 26453a36d60798d203721f308f16b87d25143485
Parents: a1ef75f
Author: zhuol <zh...@yahoo-inc.com>
Authored: Thu Feb 25 11:41:25 2016 -0600
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Feb 25 11:41:25 2016 -0600

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/worker.clj       |  5 ++++-
 .../apache/storm/utils/WorkerBackpressureThread.java | 15 +++++++--------
 2 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/26453a36/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 5b22823..fd0c98e 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -155,7 +155,10 @@
         ;; update the worker's backpressure flag to zookeeper only when it has changed
         (log-debug "BP " @(:backpressure worker) " WAS " prev-backpressure-flag)
         (when (not= prev-backpressure-flag @(:backpressure worker))
-          (.workerBackpressure storm-cluster-state storm-id assignment-id port @(:backpressure worker)))
+          (try
+            (.workerBackpressure storm-cluster-state storm-id assignment-id port @(:backpressure worker))
+            (catch Exception exc
+              (log-error exc "workerBackpressure update failed when connecting to ZK ... will retry"))))
         ))))
 
 (defn- mk-disruptor-backpressure-handler [worker]

http://git-wip-us.apache.org/repos/asf/storm/blob/26453a36/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java b/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
index 6b25502..f3b5a66 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
@@ -48,8 +48,10 @@ public class WorkerBackpressureThread extends Thread {
         }
     }
 
-    public void terminate() {
+    public void terminate() throws InterruptedException {
         running = false;
+        interrupt();
+        join();
     }
 
     public void run() {
@@ -60,7 +62,7 @@ public class WorkerBackpressureThread extends Thread {
                 }
                 callback.onEvent(workerData); // check all executors and update zk backpressure throttle for the worker if needed
             } catch (InterruptedException interEx) {
-                LOG.info("WorkerBackpressureThread gets interrupted! Ignoring Exception: ", interEx);
+                // ignored, we are shutting down.
             }
         }
     }
@@ -70,11 +72,8 @@ class BackpressureUncaughtExceptionHandler implements Thread.UncaughtExceptionHa
     private static final Logger LOG = LoggerFactory.getLogger(BackpressureUncaughtExceptionHandler.class);
     @Override
     public void uncaughtException(Thread t, Throwable e) {
-        try {
-            Utils.handleUncaughtException(e);
-        } catch (Error error) {
-            LOG.info("Received error in WorkerBackpressureThread.. terminating the worker...");
-            Runtime.getRuntime().exit(1);
-        }
+        // note that exception that happens during connecting to ZK has been ignored in the callback implementation
+        LOG.error("Received error or exception in WorkerBackpressureThread.. terminating the worker...", e);
+        Runtime.getRuntime().exit(1);
     }
 }