You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/04/17 23:02:28 UTC

samza git commit: SAMZA-1664: ZkJobCoordinator stability fixes.

Repository: samza
Updated Branches:
  refs/heads/master 12968cfb6 -> d7a071b34


SAMZA-1664: ZkJobCoordinator stability fixes.

Issues fixed:
* Handle job coordinator shutdown gracefully in case of unclean container shutdowns.
* Fix the zookeeper session handling logic.
* Fix the forever retry timeout in ZkClient re-connect.

Author: Shanthoosh Venkataraman <sv...@linkedin.com>

Reviewers: Xinyu Liu <xi...@gmail.com>

Closes #476 from shanthoosh/pullInK2Changes


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d7a071b3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d7a071b3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d7a071b3

Branch: refs/heads/master
Commit: d7a071b346a2a6d31e90fc038bd30c7ba8cb7ef1
Parents: 12968cf
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Tue Apr 17 16:02:16 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Tue Apr 17 16:02:16 2018 -0700

----------------------------------------------------------------------
 .../samza/zk/ScheduleAfterDebounceTime.java     |  35 +++++--
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 100 +++++++++++++++----
 .../samza/zk/TestScheduleAfterDebounceTime.java |  53 ----------
 3 files changed, 102 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d7a071b3/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index b53d245..f6f2dc9 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -20,12 +20,12 @@
 package org.apache.samza.zk;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import org.slf4j.Logger;
@@ -55,7 +55,8 @@ public class ScheduleAfterDebounceTime {
   /**
    * A map from actionName to {@link ScheduledFuture} of task scheduled for execution.
    */
-  private final Map<String, ScheduledFuture> futureHandles = new HashMap<>();
+  private final Map<String, ScheduledFuture> futureHandles = new ConcurrentHashMap<>();
+  private boolean isShuttingDown;
 
   public ScheduleAfterDebounceTime() {
     ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(DEBOUNCE_THREAD_NAME_FORMAT).setDaemon(true).build();
@@ -93,12 +94,24 @@ public class ScheduleAfterDebounceTime {
    * and all pending enqueued tasks will be cancelled.
    */
   public synchronized void stopScheduler() {
-    LOG.info("Stopping Scheduler");
+    if (isShuttingDown) {
+      LOG.debug("Debounce timer shutdown is already in progress!");
+      return;
+    }
+
+    isShuttingDown = true;
+    LOG.info("Shutting down debounce timer!");
 
-    scheduledExecutorService.shutdownNow();
+    // changing it back to use shutdown instead to prevent interruptions on the active task
+    scheduledExecutorService.shutdown();
+
+    // should clear out the future handles as well
+    futureHandles.keySet()
+        .forEach(this::tryCancelScheduledAction);
+  }
 
-    // Clear the existing future handles.
-    futureHandles.clear();
+  public synchronized void cancelAction(String action) {
+    this.tryCancelScheduledAction(action);
   }
 
   /**
@@ -144,22 +157,22 @@ public class ScheduleAfterDebounceTime {
         } else {
           LOG.debug("Action: {} completed successfully.", actionName);
         }
-      } catch (Throwable t) {
-        LOG.error("Execution of action: {} failed.", actionName, t);
-        doCleanUpOnTaskException(t);
+      } catch (Throwable throwable) {
+        LOG.error("Execution of action: {} failed.", actionName, throwable);
+        doCleanUpOnTaskException(throwable);
       }
     };
   }
 
   /**
-   * Handler method to invoke on a throwable during an scheduled task execution and which
+   * Handler method to invoke on a exception during an scheduled task execution and which
    * the following operations in sequential order.
    * <ul>
    *   <li> Stop the scheduler. If the task execution fails or a task is interrupted, scheduler will not accept/execute any new tasks.</li>
    *   <li> Invokes the onError handler method if taskCallback is defined.</li>
    * </ul>
    *
-   * @param throwable the throwable that happened during task execution.
+   * @param throwable the exception happened during task execution.
    */
   private void doCleanUpOnTaskException(Throwable throwable) {
     stopScheduler();

http://git-wip-us.apache.org/repos/asf/samza/blob/d7a071b3/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 407291a..0e0f815 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -58,6 +58,8 @@ import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.zookeeper.Watcher.Event.KeeperState.*;
+
 /**
  * JobCoordinator for stand alone processor managed via Zookeeper.
  */
@@ -97,6 +99,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private JobModel newJobModel;
   private int debounceTimeMs;
   private boolean hasCreatedStreams = false;
+  private boolean initiatedShutdown = false;
   private String cachedJobModelVersion = null;
   private Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>();
 
@@ -121,7 +124,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
     debounceTimer = new ScheduleAfterDebounceTime();
     debounceTimer.setScheduledTaskCallback(throwable -> {
-        LOG.error("Received exception from in JobCoordinator Processing!", throwable);
+        LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
         stop();
       });
     systemAdmins = new SystemAdmins(config);
@@ -137,19 +140,50 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
 
   @Override
   public synchronized void stop() {
-    if (coordinatorListener != null) {
-      coordinatorListener.onJobModelExpired();
+    // Make the shutdown idempotent
+    if (initiatedShutdown) {
+      LOG.debug("Job Coordinator shutdown is already in progress!");
+      return;
     }
-    //Setting the isLeader metric to false when the stream processor shuts down because it does not remain the leader anymore
+
+    LOG.info("Shutting down Job Coordinator...");
+    initiatedShutdown = true;
+    boolean shutdownSuccessful = false;
+
+    // Notify the metrics about abandoning the leadership. Moving it up the chain in the shutdown sequence so that
+    // in case of unclean shutdown, we get notified about lack of leader and we can set up some alerts around the absence of leader.
     metrics.isLeader.set(false);
-    debounceTimer.stopScheduler();
-    zkController.stop();
 
-    shutdownMetrics();
-    if (coordinatorListener != null) {
-      coordinatorListener.onCoordinatorStop();
+    try {
+      // todo: what does it mean for coordinator listener to be null? why not have it part of constructor?
+      if (coordinatorListener != null) {
+        coordinatorListener.onJobModelExpired();
+      }
+
+      debounceTimer.stopScheduler();
+
+      LOG.debug("Shutting down ZkController.");
+      zkController.stop();
+
+      LOG.debug("Shutting down system admins.");
+      systemAdmins.stop();
+
+      LOG.debug("Shutting down metrics.");
+      shutdownMetrics();
+
+      if (coordinatorListener != null) {
+        coordinatorListener.onCoordinatorStop();
+      }
+
+      shutdownSuccessful = true;
+    } catch (Throwable t) {
+      LOG.error("Encountered errors during job coordinator stop.", t);
+      if (coordinatorListener != null) {
+        coordinatorListener.onCoordinatorFailure(t);
+      }
+    } finally {
+      LOG.info("Job Coordinator shutdown finished with ShutdownComplete=" + shutdownSuccessful);
     }
-    systemAdmins.stop();
   }
 
   private void startMetrics() {
@@ -380,7 +414,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     }
   }
 
-  /// listener to handle session expiration
+  /// listener to handle ZK state change events
   class ZkSessionStateChangedListener implements IZkStateListener {
 
     private static final String ZK_SESSION_ERROR = "ZK_SESSION_ERROR";
@@ -388,19 +422,41 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     @Override
     public void handleStateChanged(Watcher.Event.KeeperState state)
         throws Exception {
-      if (state == Watcher.Event.KeeperState.Expired) {
-        // if the session has expired it means that all the registration's ephemeral nodes are gone.
-        LOG.warn("Got session expired event for processor=" + processorId);
+      switch (state) {
+        case Expired:
+          // if the session has expired it means that all the registration's ephemeral nodes are gone.
+          LOG.warn("Got " + state.toString() + " event for processor=" + processorId + ". Stopping the container and unregister the processor node.");
 
-        // increase generation of the ZK connection. All the callbacks from the previous generation will be ignored.
-        zkUtils.incGeneration();
+          // increase generation of the ZK session. All the callbacks from the previous generation will be ignored.
+          zkUtils.incGeneration();
 
-        if (coordinatorListener != null) {
-          coordinatorListener.onJobModelExpired();
-        }
-        // reset all the values that might have been from the previous session (e.g ephemeral node path)
-        zkUtils.unregister();
+          if (coordinatorListener != null) {
+            coordinatorListener.onJobModelExpired();
+          }
 
+          // reset all the values that might have been from the previous session (e.g ephemeral node path)
+          zkUtils.unregister();
+          return;
+        case Disconnected:
+          // if the session has expired it means that all the registration's ephemeral nodes are gone.
+          LOG.warn("Got " + state.toString() + " event for processor=" + processorId + ". Scheduling a coordinator stop.");
+
+          // If the connection is not restored after debounceTimeMs, the process is considered dead.
+          debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, new ZkConfig(config).getZkSessionTimeoutMs(), () -> stop());
+          return;
+        case AuthFailed:
+        case NoSyncConnected:
+        case Unknown:
+          LOG.warn("Got unexpected failure event " + state.toString() + " for processor=" + processorId + ". Stopping the job coordinator.");
+          debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
+          return;
+        case SyncConnected:
+          LOG.info("Got syncconnected event for processor=" + processorId + ".");
+          debounceTimer.cancelAction(ZK_SESSION_ERROR);
+          return;
+        default:
+          // received SyncConnected, ConnectedReadOnly, and SaslAuthenticated. NoOp
+          LOG.info("Got ZK event " + state.toString() + " for processor=" + processorId + ". Continue");
       }
     }
 
@@ -416,7 +472,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     @Override
     public void handleSessionEstablishmentError(Throwable error)
         throws Exception {
-      // this means we cannot connect to zookeeper
+      // this means we cannot connect to zookeeper to establish a session
       LOG.info("handleSessionEstablishmentError received for processor=" + processorId, error);
       debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/d7a071b3/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
index 697833b..7f687d7 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
@@ -121,57 +121,4 @@ public class TestScheduleAfterDebounceTime {
     Assert.assertEquals(RuntimeException.class, taskCallbackException[0].getClass());
     scheduledQueue.stopScheduler();
   }
-
-  /**
-   * Validates if the interrupted exception triggered by ExecutorService is handled by ScheduleAfterDebounceTime.
-   */
-  @Test
-  public void testStopSchedulerInvokesRegisteredCallback() throws InterruptedException {
-    final CountDownLatch hasTaskCallbackCompleted = new CountDownLatch(1);
-    final CountDownLatch hasThreadStarted = new CountDownLatch(1);
-    final CountDownLatch isSchedulerShutdownTriggered = new CountDownLatch(1);
-
-    /**
-     * Declaring this as an array to record the value inside the lambda.
-     */
-    final Throwable[] taskCallbackException = new Exception[1];
-
-    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
-    scheduledQueue.setScheduledTaskCallback(throwable -> {
-      /**
-       * Assertion failures in callback doesn't fail the test.
-       * Record the received exception here and assert outside
-       * the callback.
-       */
-        taskCallbackException[0] = throwable;
-        hasTaskCallbackCompleted.countDown();
-      });
-
-    scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME , () -> {
-        hasThreadStarted.countDown();
-        try {
-          LOG.debug("Waiting for the scheduler shutdown trigger.");
-          isSchedulerShutdownTriggered.await();
-        } catch (InterruptedException e) {
-          /**
-           * Don't swallow the exception and restore the interrupt status.
-           * Expect the ScheduleDebounceTime to handle this interrupt
-           * and invoke ScheduledTaskCallback.
-           */
-          Thread.currentThread().interrupt();
-        }
-      });
-
-    // Wait for the task to run.
-    hasThreadStarted.await();
-
-    // Shutdown the scheduler and update relevant state.
-    scheduledQueue.stopScheduler();
-    isSchedulerShutdownTriggered.countDown();
-
-    hasTaskCallbackCompleted.await();
-
-    // Assert on exception thrown.
-    Assert.assertEquals(InterruptedException.class, taskCallbackException[0].getClass());
-  }
 }