You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/10/03 22:23:47 UTC

[09/16] samza git commit: SAMZA-1406: Fix potential orphaned containers problem in stand alone

SAMZA-1406: Fix potential orphaned containers problem in stand alone


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c45c7747
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c45c7747
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c45c7747

Branch: refs/heads/master
Commit: c45c7747ae371eee11e2f41dd4e32a53b12c6c91
Parents: ebce13e
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Thu Sep 14 21:17:36 2017 -0700
Committer: navina <na...@apache.org>
Committed: Thu Sep 14 21:18:55 2017 -0700

----------------------------------------------------------------------
 .../samza/zk/ScheduleAfterDebounceTime.java     | 172 +++++++++++++------
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  26 ++-
 .../samza/zk/TestScheduleAfterDebounceTime.java |  74 +++++++-
 3 files changed, 204 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c45c7747/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index 6174063..3a7dca9 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -20,99 +20,157 @@
 package org.apache.samza.zk;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * This class allows scheduling a Runnable actions after some debounce time.
  * When the same action is scheduled it needs to cancel the previous one. To accomplish that we keep the previous
- * future in a map, keyed by the action name. Here we predefine some actions, which are used in the
- * ZK based standalone app.
+ * future in a map, keyed by the action name.
  */
 public class ScheduleAfterDebounceTime {
-  public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
-  public static final long TIMEOUT_MS = 1000 * 10; // timeout to wait for a task to complete
+  private static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
+  private static final String DEBOUNCE_THREAD_NAME_FORMAT = "debounce-thread-%d";
 
-  // Here we predefine some actions which are used in the ZK based standalone app.
-  // Action name when the JobModel version changes
-  public static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange";
-
-  // Action name when the Processor membership changes
-  public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
+  // timeout to wait for a task to complete.
+  private static final int TIMEOUT_MS = 1000 * 10;
 
   /**
-   *
-   * cleanup process is started after every new job model generation is complete.
-   * It deletes old versions of job model and the barrier.
-   * How many to delete (or to leave) is controlled by @see org.apache.samza.zk.ZkJobCoordinator#NUM_VERSIONS_TO_LEAVE.
-   **/
-  public static final String ON_ZK_CLEANUP = "OnCleanUp";
+   * {@link ScheduledTaskCallback} associated with the scheduler. OnError method of the
+   * callback will be invoked on first scheduled task failure.
+   */
+  private Optional<ScheduledTaskCallback> scheduledTaskCallback;
 
-  private final ScheduledTaskFailureCallback scheduledTaskFailureCallback;
+  // Responsible for scheduling delayed actions.
+  private final ScheduledExecutorService scheduledExecutorService;
 
-  private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
-      new ThreadFactoryBuilder().setNameFormat("debounce-thread-%d").setDaemon(true).build());
+  /**
+   * A map from actionName to {@link ScheduledFuture} of task scheduled for execution.
+   */
   private final Map<String, ScheduledFuture> futureHandles = new HashMap<>();
 
-  // Ideally, this should be only used for testing. But ZkBarrierForVersionUpgrades uses it. This needs to be fixed.
-  // TODO: Timer shouldn't be passed around the components. It should be associated with the JC or the caller of
-  // coordinationUtils.
   public ScheduleAfterDebounceTime() {
-    this.scheduledTaskFailureCallback = null;
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(DEBOUNCE_THREAD_NAME_FORMAT).setDaemon(true).build();
+    this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+  }
+
+  public void setScheduledTaskCallback(ScheduledTaskCallback scheduledTaskCallback) {
+    this.scheduledTaskCallback = Optional.ofNullable(scheduledTaskCallback);
   }
 
-  public ScheduleAfterDebounceTime(ScheduledTaskFailureCallback errorScheduledTaskFailureCallback) {
-    this.scheduledTaskFailureCallback = errorScheduledTaskFailureCallback;
+  /**
+   * Performs the following operations in sequential order.
+   * <ul>
+   *    <li> Makes best effort to cancel any existing task in task queue associated with the action.</li>
+   *    <li> Schedules the incoming action for later execution and records its future.</li>
+   * </ul>
+   *
+   * @param actionName the name of scheduleable action.
+   * @param delayInMillis the time from now to delay execution.
+   * @param runnable the action to execute.
+   */
+  public synchronized void scheduleAfterDebounceTime(String actionName, long delayInMillis, Runnable runnable) {
+    // 1. Try to cancel any existing scheduled task associated with the action.
+    tryCancelScheduledAction(actionName);
+
+    // 2. Schedule the action.
+    ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(getScheduleableAction(actionName, runnable), delayInMillis, TimeUnit.MILLISECONDS);
+
+    LOG.info("Scheduled action: {} to run after: {} milliseconds.", actionName, delayInMillis);
+    futureHandles.put(actionName, scheduledFuture);
   }
 
-  synchronized public void scheduleAfterDebounceTime(String actionName, long debounceTimeMs, Runnable runnable) {
-    // check if this action has been scheduled already
-    ScheduledFuture sf = futureHandles.get(actionName);
-    if (sf != null && !sf.isDone()) {
-      LOG.info("cancel future for " + actionName);
-      // attempt to cancel
-      if (!sf.cancel(false)) {
+  /**
+   * Stops the scheduler. After this invocation no further schedule calls will be accepted
+   * and all pending enqueued tasks will be cancelled.
+   */
+  public synchronized void stopScheduler() {
+    scheduledExecutorService.shutdownNow();
+
+    // Clear the existing future handles.
+    futureHandles.clear();
+  }
+
+  /**
+   * Tries to cancel the task that belongs to {@code actionName} submitted to the queue.
+   *
+   * @param actionName the name of action to cancel.
+   */
+  private void tryCancelScheduledAction(String actionName) {
+    ScheduledFuture scheduledFuture = futureHandles.get(actionName);
+    if (scheduledFuture != null && !scheduledFuture.isDone()) {
+      LOG.info("Attempting to cancel the future of action: {}", actionName);
+      // Attempt to cancel
+      if (!scheduledFuture.cancel(false)) {
         try {
-          sf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+          scheduledFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
         } catch (Exception e) {
           // we ignore the exception
-          LOG.warn("cancel for action " + actionName + " failed with ", e);
+          LOG.warn("Cancelling the future of action: {} failed.", actionName, e);
         }
       }
       futureHandles.remove(actionName);
     }
-    // schedule a new task
-    sf = scheduledExecutorService.schedule(() -> {
-        try {
-          runnable.run();
-          LOG.debug(actionName + " completed successfully.");
-        } catch (Throwable t) {
-          LOG.error(actionName + " threw an exception.", t);
-          if (scheduledTaskFailureCallback != null) {
-            scheduledTaskFailureCallback.onError(t);
-          }
+  }
+
+  /**
+   * Decorate the executable action with exception handlers to facilitate cleanup on failures.
+   *
+   * @param actionName the name of the scheduleable action.
+   * @param runnable the action to execute.
+   * @return the executable action decorated with exception handlers.
+   */
+  private Runnable getScheduleableAction(String actionName, Runnable runnable) {
+    return () -> {
+      try {
+        runnable.run();
+        /*
+         * Expects all run() implementations <b>not to swallow the interrupts.</b>
+         * This thread is interrupted from an external source(mostly executor service) to die.
+         */
+        if (Thread.currentThread().isInterrupted()) {
+          LOG.warn("Action: {} is interrupted.", actionName);
+          doCleanUpOnTaskException(new InterruptedException());
+        } else {
+          LOG.debug("Action: {} completed successfully.", actionName);
         }
-      },
-     debounceTimeMs,
-     TimeUnit.MILLISECONDS);
-    LOG.info("scheduled " + actionName + " in " + debounceTimeMs);
-    futureHandles.put(actionName, sf);
+      } catch (Exception exception) {
+        LOG.error("Execution of action: {} failed.", actionName, exception);
+        doCleanUpOnTaskException(exception);
+      }
+    };
   }
 
-  public void stopScheduler() {
-    // shutdown executor service
-    scheduledExecutorService.shutdown();
+  /**
+   * Handler method to invoke on a exception during an scheduled task execution and which
+   * the following operations in sequential order.
+   * <ul>
+   *   <li> Stop the scheduler. If the task execution fails or a task is interrupted, scheduler will not accept/execute any new tasks.</li>
+   *   <li> Invokes the onError handler method if taskCallback is defined.</li>
+   * </ul>
+   *
+   * @param exception the exception happened during task execution.
+   */
+  private void doCleanUpOnTaskException(Exception exception) {
+    stopScheduler();
+
+    scheduledTaskCallback.ifPresent(callback -> callback.onError(exception));
   }
 
-  interface ScheduledTaskFailureCallback {
+  /**
+   * A ScheduledTaskCallback::onError() is invoked on first occurrence of exception
+   * when executing a task. Provides plausible hook for handling failures
+   * in an asynchronous scheduled task execution.
+   */
+  interface ScheduledTaskCallback {
     void onError(Throwable throwable);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c45c7747/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 9f64b3a..2b8349c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -61,6 +61,19 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private static final int METADATA_CACHE_TTL_MS = 5000;
   private static final int NUM_VERSIONS_TO_LEAVE = 10;
 
+  // Action name when the JobModel version changes
+  private static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange";
+
+  // Action name when the Processor membership changes
+  private static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
+
+  /**
+   * Cleanup process is started after every new job model generation is complete.
+   * It deletes old versions of job model and the barrier.
+   * How many to delete (or to leave) is controlled by @see org.apache.samza.zk.ZkJobCoordinator#NUM_VERSIONS_TO_LEAVE.
+   **/
+  private static final String ON_ZK_CLEANUP = "OnCleanUp";
+
   private final ZkUtils zkUtils;
   private final String processorId;
   private final ZkController zkController;
@@ -95,7 +108,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         new ZkBarrierListenerImpl());
     this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
     this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
-    debounceTimer = new ScheduleAfterDebounceTime(throwable -> {
+    debounceTimer = new ScheduleAfterDebounceTime();
+    debounceTimer.setScheduledTaskCallback(throwable -> {
         LOG.error("Received exception from in JobCoordinator Processing!", throwable);
         stop();
       });
@@ -157,7 +171,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   @Override
   public void onProcessorChange(List<String> processors) {
     LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
-    debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, debounceTimeMs,
+    debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs,
         () -> doOnProcessorChange(processors));
   }
 
@@ -195,12 +209,12 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
 
     LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion);
 
-    debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_ZK_CLEANUP, 0, () -> zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE));
+    debounceTimer.scheduleAfterDebounceTime(ON_ZK_CLEANUP, 0, () -> zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE));
   }
 
   @Override
   public void onNewJobModelAvailable(final String version) {
-    debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () ->
+    debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () ->
       {
         LOG.info("pid=" + processorId + "new JobModel available");
         // get the new job model from ZK
@@ -273,7 +287,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
       LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
       metrics.isLeader.set(true);
       zkController.subscribeToProcessorChange();
-      debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, debounceTimeMs, () ->
+      debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () ->
         {
           // actual actions to do are the same as onProcessorChange
           doOnProcessorChange(new ArrayList<>());
@@ -312,7 +326,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
           LOG.warn("Barrier for version " + version + " timed out.");
           if (zkController.isLeader()) {
             LOG.info("Leader will schedule a new job model generation");
-            debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, debounceTimeMs, () ->
+            debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () ->
               {
                 // actual actions to do are the same as onProcessorChange
                 doOnProcessorChange(new ArrayList<>());

http://git-wip-us.apache.org/repos/asf/samza/blob/c45c7747/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
index d3152be..a681767 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
@@ -19,15 +19,23 @@
 
 package org.apache.samza.zk;
 
-import org.junit.Assert;
-import org.junit.Test;
-
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestScheduleAfterDebounceTime {
+  private static final Logger LOG = LoggerFactory.getLogger(TestScheduleAfterDebounceTime.class);
+
   private static final long WAIT_TIME = 500;
 
+  @Rule
+  public Timeout testTimeOutInSeconds = new Timeout(10, TimeUnit.SECONDS);
+
   class TestObj {
     private volatile int i = 0;
     public void inc() {
@@ -91,8 +99,10 @@ public class TestScheduleAfterDebounceTime {
   @Test
   public void testRunnableWithExceptionInvokesCallback() throws InterruptedException {
     final CountDownLatch latch = new CountDownLatch(1);
-    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(e -> {
-        Assert.assertEquals(RuntimeException.class, e.getClass());
+    final Throwable[] taskCallbackException = new Exception[1];
+    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
+    scheduledQueue.setScheduledTaskCallback(throwable -> {
+        taskCallbackException[0] = throwable;
         latch.countDown();
       });
 
@@ -107,6 +117,60 @@ public class TestScheduleAfterDebounceTime {
     boolean result = latch.await(5 * WAIT_TIME, TimeUnit.MILLISECONDS);
     Assert.assertTrue("Latch timed-out.", result);
     Assert.assertEquals(0, testObj.get());
+    Assert.assertEquals(RuntimeException.class, taskCallbackException[0].getClass());
+    scheduledQueue.stopScheduler();
+  }
+
+  /**
+   * Validates if the interrupted exception triggered by ExecutorService is handled by ScheduleAfterDebounceTime.
+   */
+  @Test
+  public void testStopSchedulerInvokesRegisteredCallback() throws InterruptedException {
+    final CountDownLatch hasTaskCallbackCompleted = new CountDownLatch(1);
+    final CountDownLatch hasThreadStarted = new CountDownLatch(1);
+    final CountDownLatch isSchedulerShutdownTriggered = new CountDownLatch(1);
+
+    /**
+     * Declaring this as an array to record the value inside the lambda.
+     */
+    final Throwable[] taskCallbackException = new Exception[1];
+
+    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
+    scheduledQueue.setScheduledTaskCallback(throwable -> {
+      /**
+       * Assertion failures in callback doesn't fail the test.
+       * Record the received exception here and assert outside
+       * the callback.
+       */
+        taskCallbackException[0] = throwable;
+        hasTaskCallbackCompleted.countDown();
+      });
+
+    scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME , () -> {
+        hasThreadStarted.countDown();
+        try {
+          LOG.debug("Waiting for the scheduler shutdown trigger.");
+          isSchedulerShutdownTriggered.await();
+        } catch (InterruptedException e) {
+          /**
+           * Don't swallow the exception and restore the interrupt status.
+           * Expect the ScheduleDebounceTime to handle this interrupt
+           * and invoke ScheduledTaskCallback.
+           */
+          Thread.currentThread().interrupt();
+        }
+      });
+
+    // Wait for the task to run.
+    hasThreadStarted.await();
+
+    // Shutdown the scheduler and update relevant state.
     scheduledQueue.stopScheduler();
+    isSchedulerShutdownTriggered.countDown();
+
+    hasTaskCallbackCompleted.await();
+
+    // Assert on exception thrown.
+    Assert.assertEquals(InterruptedException.class, taskCallbackException[0].getClass());
   }
 }