You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/06/02 23:13:56 UTC

[helix] branch master updated: Gracefully handle interruptions in the Helix logic. (#1779)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 16117fe  Gracefully handle interruptions in the Helix logic. (#1779)
16117fe is described below

commit 16117fe66e6f3f3a20f39c54d1f7cda8c209d5b5
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Wed Jun 2 16:13:50 2021 -0700

    Gracefully handle interruptions in the Helix logic. (#1779)
    
    This PR aims to address the problem that the interrupt signal is swallowed by the catch logic so the thread is not really interrupted. This may cause leakage or race conditions.
---
 .../pipeline/AbstractAsyncBaseStage.java           | 24 +++----
 .../helix/manager/zk/CallbackEventExecutor.java    |  2 +-
 .../helix/manager/zk/ZkCacheBaseDataAccessor.java  |  5 +-
 .../java/org/apache/helix/task/TaskRunner.java     |  9 +--
 .../controller/stages/TestAsyncBaseStage.java      | 82 ++++++++++++++++++++++
 .../zkclient/callback/ZkAsyncCallbacks.java        |  1 +
 6 files changed, 104 insertions(+), 19 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractAsyncBaseStage.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractAsyncBaseStage.java
index f36725b..18c278b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractAsyncBaseStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractAsyncBaseStage.java
@@ -38,19 +38,19 @@ public abstract class AbstractAsyncBaseStage extends AbstractBaseStage {
       throw new StageException("No async worker found for " + taskType);
     }
 
-    worker.queueEvent(taskType, new Runnable() {
-      @Override
-      public void run() {
-        long startTimestamp = System.currentTimeMillis();
-        logger.info("START AsyncProcess: {}", taskType);
-        try {
-          execute(event);
-        } catch (Exception e) {
-          logger.error("Failed to process {} asynchronously", taskType, e);
-        }
-        long endTimestamp = System.currentTimeMillis();
-        logger.info("END AsyncProcess: {}, took {} ms", taskType, endTimestamp - startTimestamp);
+    worker.queueEvent(taskType, () -> {
+      long startTimestamp = System.currentTimeMillis();
+      logger.info("START AsyncProcess: {}", taskType);
+      try {
+        execute(event);
+      } catch (InterruptedException e) {
+        logger.warn("Process {} has been interrupted", taskType, e);
+        Thread.currentThread().interrupt();
+      } catch (Exception e) {
+        logger.error("Failed to process {} asynchronously", taskType, e);
       }
+      long endTimestamp = System.currentTimeMillis();
+      logger.info("END AsyncProcess: {}, took {} ms", taskType, endTimestamp - startTimestamp);
     });
     logger.info("Submitted asynchronous {} task to worker", taskType);
   }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java
index 90719d1..1bf3721 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java
@@ -126,4 +126,4 @@ public class CallbackEventExecutor {
     CallbackEventThreadPoolFactory.unregisterEventProcessor(_manager.hashCode());
     _threadPoolExecutor = null;
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index b3df401..e165199 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -767,7 +767,9 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
         }
       }
     } catch (InterruptedException e) {
-      LOG.error("Current thread is interrupted when starting ZkCacheEventThread. ", e);
+      // The InterruptedException may come from lockInterruptibly().
+      // If it fails to get the lock, throw exception so none of the initialization will be done.
+      throw new HelixException("Current thread is interrupted when acquiring lock. ", e);
     } finally {
       _eventLock.unlock();
     }
@@ -815,7 +817,6 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
     }
 
     LOG.debug("Stop ZkCacheEventThread...done");
-
   }
 
   @Override
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index 794182d..2507990 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
  */
 
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.SystemPropertyKeys;
@@ -134,7 +135,7 @@ public class TaskRunner implements Runnable {
   }
 
   /**
-   * Waits uninterruptibly until the task has started.
+   * Waits until the task has started.
    */
   public void waitTillStarted() {
     synchronized (_startedSync) {
@@ -142,7 +143,7 @@ public class TaskRunner implements Runnable {
         try {
           _startedSync.wait();
         } catch (InterruptedException e) {
-          LOG.warn(
+          throw new HelixException(
               String.format("Interrupted while waiting for task %s to start.", _taskPartition), e);
         }
       }
@@ -150,7 +151,7 @@ public class TaskRunner implements Runnable {
   }
 
   /**
-   * Waits uninterruptibly until the task has finished, either normally or due to an
+   * Waits until the task has finished, either normally or due to an
    * error/cancellation..
    */
   public TaskResult waitTillDone() {
@@ -159,7 +160,7 @@ public class TaskRunner implements Runnable {
         try {
           _doneSync.wait();
         } catch (InterruptedException e) {
-          LOG.warn(
+          throw new HelixException(
               String.format("Interrupted while waiting for task %s to complete.", _taskPartition),
               e);
         }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestAsyncBaseStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestAsyncBaseStage.java
new file mode 100644
index 0000000..e5c0efe
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestAsyncBaseStage.java
@@ -0,0 +1,82 @@
+package org.apache.helix.controller.stages;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestAsyncBaseStage {
+  private static AsyncWorkerType DEFAULT_WORKER_TYPE = AsyncWorkerType.ExternalViewComputeWorker;
+  @Test
+  public void testAsyncStageCleanup() throws Exception {
+    BlockingAsyncStage blockingAsyncStage = new BlockingAsyncStage();
+
+    Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> asyncFIFOWorkerPool =
+        new HashMap<>();
+    DedupEventProcessor<String, Runnable> worker =
+        new DedupEventProcessor<String, Runnable>("ClusterName", DEFAULT_WORKER_TYPE.name()) {
+          @Override
+          protected void handleEvent(Runnable event) {
+            event.run();
+          }
+        };
+    worker.start();
+    asyncFIFOWorkerPool.put(DEFAULT_WORKER_TYPE, worker);
+
+    ClusterEvent event = new ClusterEvent("ClusterName", ClusterEventType.OnDemandRebalance);
+    event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), asyncFIFOWorkerPool);
+
+    // Test normal execute case
+    blockingAsyncStage.process(event);
+    Assert.assertTrue(TestHelper.verify(() -> blockingAsyncStage._isStarted, 500));
+    Assert.assertFalse(blockingAsyncStage._isFinished);
+    blockingAsyncStage.proceed();
+    Assert.assertTrue(TestHelper.verify(() -> blockingAsyncStage._isFinished, 500));
+    blockingAsyncStage.reset();
+
+    // Test interruption case
+    blockingAsyncStage.process(event);
+    TestHelper.verify(() -> blockingAsyncStage._isStarted, 500);
+    Assert.assertFalse(blockingAsyncStage._isFinished);
+    worker.shutdown();
+    Assert.assertFalse(TestHelper.verify(() -> blockingAsyncStage._isFinished, 1000));
+    Assert.assertFalse(worker.isAlive());
+    blockingAsyncStage.reset();
+  }
+
+  private class BlockingAsyncStage extends AbstractAsyncBaseStage {
+    public boolean _isFinished = false;
+    public boolean _isStarted = false;
+
+    private CountDownLatch _countDownLatch = new CountDownLatch(1);
+
+    public void reset() {
+      _isFinished = false;
+      _isStarted = false;
+      _countDownLatch = new CountDownLatch(1);
+    }
+
+    public void proceed() {
+      _countDownLatch.countDown();
+    }
+
+    @Override
+    public AsyncWorkerType getAsyncWorkerType() {
+      return DEFAULT_WORKER_TYPE;
+    }
+
+    @Override
+    public void execute(ClusterEvent event) throws Exception {
+      _isStarted = true;
+      _countDownLatch.await();
+      _isFinished = true;
+    }
+  }
+}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
index 7f662cf..506d234 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
@@ -244,6 +244,7 @@ public class ZkAsyncCallbacks {
         }
       } catch (InterruptedException e) {
         LOG.error("Interrupted waiting for success", e);
+        return false;
       }
       return true;
     }