You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/04/17 23:02:28 UTC
samza git commit: SAMZA-1664: ZkJobCoordinator stability fixes.
Repository: samza
Updated Branches:
refs/heads/master 12968cfb6 -> d7a071b34
SAMZA-1664: ZkJobCoordinator stability fixes.
Issues fixed:
* Handle job coordinator shutdown gracefully in case of unclean container shutdowns.
* Fix the zookeeper session handling logic.
* Fix the forever retry timeout in ZkClient re-connect.
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Xinyu Liu <xi...@gmail.com>
Closes #476 from shanthoosh/pullInK2Changes
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d7a071b3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d7a071b3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d7a071b3
Branch: refs/heads/master
Commit: d7a071b346a2a6d31e90fc038bd30c7ba8cb7ef1
Parents: 12968cf
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Tue Apr 17 16:02:16 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Tue Apr 17 16:02:16 2018 -0700
----------------------------------------------------------------------
.../samza/zk/ScheduleAfterDebounceTime.java | 35 +++++--
.../org/apache/samza/zk/ZkJobCoordinator.java | 100 +++++++++++++++----
.../samza/zk/TestScheduleAfterDebounceTime.java | 53 ----------
3 files changed, 102 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/d7a071b3/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index b53d245..f6f2dc9 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -20,12 +20,12 @@
package org.apache.samza.zk;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
@@ -55,7 +55,8 @@ public class ScheduleAfterDebounceTime {
/**
* A map from actionName to {@link ScheduledFuture} of task scheduled for execution.
*/
- private final Map<String, ScheduledFuture> futureHandles = new HashMap<>();
+ private final Map<String, ScheduledFuture> futureHandles = new ConcurrentHashMap<>();
+ private boolean isShuttingDown;
public ScheduleAfterDebounceTime() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(DEBOUNCE_THREAD_NAME_FORMAT).setDaemon(true).build();
@@ -93,12 +94,24 @@ public class ScheduleAfterDebounceTime {
* and all pending enqueued tasks will be cancelled.
*/
public synchronized void stopScheduler() {
- LOG.info("Stopping Scheduler");
+ if (isShuttingDown) {
+ LOG.debug("Debounce timer shutdown is already in progress!");
+ return;
+ }
+
+ isShuttingDown = true;
+ LOG.info("Shutting down debounce timer!");
- scheduledExecutorService.shutdownNow();
+ // changing it back to use shutdown instead to prevent interruptions on the active task
+ scheduledExecutorService.shutdown();
+
+ // should clear out the future handles as well
+ futureHandles.keySet()
+ .forEach(this::tryCancelScheduledAction);
+ }
- // Clear the existing future handles.
- futureHandles.clear();
+ public synchronized void cancelAction(String action) {
+ this.tryCancelScheduledAction(action);
}
/**
@@ -144,22 +157,22 @@ public class ScheduleAfterDebounceTime {
} else {
LOG.debug("Action: {} completed successfully.", actionName);
}
- } catch (Throwable t) {
- LOG.error("Execution of action: {} failed.", actionName, t);
- doCleanUpOnTaskException(t);
+ } catch (Throwable throwable) {
+ LOG.error("Execution of action: {} failed.", actionName, throwable);
+ doCleanUpOnTaskException(throwable);
}
};
}
/**
- * Handler method to invoke on a throwable during an scheduled task execution and which
+ * Handler method to invoke on a exception during an scheduled task execution and which
* the following operations in sequential order.
* <ul>
* <li> Stop the scheduler. If the task execution fails or a task is interrupted, scheduler will not accept/execute any new tasks.</li>
* <li> Invokes the onError handler method if taskCallback is defined.</li>
* </ul>
*
- * @param throwable the throwable that happened during task execution.
+ * @param throwable the exception happened during task execution.
*/
private void doCleanUpOnTaskException(Throwable throwable) {
stopScheduler();
http://git-wip-us.apache.org/repos/asf/samza/blob/d7a071b3/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 407291a..0e0f815 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -58,6 +58,8 @@ import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.zookeeper.Watcher.Event.KeeperState.*;
+
/**
* JobCoordinator for stand alone processor managed via Zookeeper.
*/
@@ -97,6 +99,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
private JobModel newJobModel;
private int debounceTimeMs;
private boolean hasCreatedStreams = false;
+ private boolean initiatedShutdown = false;
private String cachedJobModelVersion = null;
private Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>();
@@ -121,7 +124,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
debounceTimer = new ScheduleAfterDebounceTime();
debounceTimer.setScheduledTaskCallback(throwable -> {
- LOG.error("Received exception from in JobCoordinator Processing!", throwable);
+ LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
stop();
});
systemAdmins = new SystemAdmins(config);
@@ -137,19 +140,50 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
@Override
public synchronized void stop() {
- if (coordinatorListener != null) {
- coordinatorListener.onJobModelExpired();
+ // Make the shutdown idempotent
+ if (initiatedShutdown) {
+ LOG.debug("Job Coordinator shutdown is already in progress!");
+ return;
}
- //Setting the isLeader metric to false when the stream processor shuts down because it does not remain the leader anymore
+
+ LOG.info("Shutting down Job Coordinator...");
+ initiatedShutdown = true;
+ boolean shutdownSuccessful = false;
+
+ // Notify the metrics about abandoning the leadership. Moving it up the chain in the shutdown sequence so that
+ // in case of unclean shutdown, we get notified about lack of leader and we can set up some alerts around the absence of leader.
metrics.isLeader.set(false);
- debounceTimer.stopScheduler();
- zkController.stop();
- shutdownMetrics();
- if (coordinatorListener != null) {
- coordinatorListener.onCoordinatorStop();
+ try {
+ // todo: what does it mean for coordinator listener to be null? why not have it part of constructor?
+ if (coordinatorListener != null) {
+ coordinatorListener.onJobModelExpired();
+ }
+
+ debounceTimer.stopScheduler();
+
+ LOG.debug("Shutting down ZkController.");
+ zkController.stop();
+
+ LOG.debug("Shutting down system admins.");
+ systemAdmins.stop();
+
+ LOG.debug("Shutting down metrics.");
+ shutdownMetrics();
+
+ if (coordinatorListener != null) {
+ coordinatorListener.onCoordinatorStop();
+ }
+
+ shutdownSuccessful = true;
+ } catch (Throwable t) {
+ LOG.error("Encountered errors during job coordinator stop.", t);
+ if (coordinatorListener != null) {
+ coordinatorListener.onCoordinatorFailure(t);
+ }
+ } finally {
+ LOG.info("Job Coordinator shutdown finished with ShutdownComplete=" + shutdownSuccessful);
}
- systemAdmins.stop();
}
private void startMetrics() {
@@ -380,7 +414,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
}
}
- /// listener to handle session expiration
+ /// listener to handle ZK state change events
class ZkSessionStateChangedListener implements IZkStateListener {
private static final String ZK_SESSION_ERROR = "ZK_SESSION_ERROR";
@@ -388,19 +422,41 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
@Override
public void handleStateChanged(Watcher.Event.KeeperState state)
throws Exception {
- if (state == Watcher.Event.KeeperState.Expired) {
- // if the session has expired it means that all the registration's ephemeral nodes are gone.
- LOG.warn("Got session expired event for processor=" + processorId);
+ switch (state) {
+ case Expired:
+ // if the session has expired it means that all the registration's ephemeral nodes are gone.
+ LOG.warn("Got " + state.toString() + " event for processor=" + processorId + ". Stopping the container and unregister the processor node.");
- // increase generation of the ZK connection. All the callbacks from the previous generation will be ignored.
- zkUtils.incGeneration();
+ // increase generation of the ZK session. All the callbacks from the previous generation will be ignored.
+ zkUtils.incGeneration();
- if (coordinatorListener != null) {
- coordinatorListener.onJobModelExpired();
- }
- // reset all the values that might have been from the previous session (e.g ephemeral node path)
- zkUtils.unregister();
+ if (coordinatorListener != null) {
+ coordinatorListener.onJobModelExpired();
+ }
+ // reset all the values that might have been from the previous session (e.g ephemeral node path)
+ zkUtils.unregister();
+ return;
+ case Disconnected:
+ // if the session has expired it means that all the registration's ephemeral nodes are gone.
+ LOG.warn("Got " + state.toString() + " event for processor=" + processorId + ". Scheduling a coordinator stop.");
+
+ // If the connection is not restored after debounceTimeMs, the process is considered dead.
+ debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, new ZkConfig(config).getZkSessionTimeoutMs(), () -> stop());
+ return;
+ case AuthFailed:
+ case NoSyncConnected:
+ case Unknown:
+ LOG.warn("Got unexpected failure event " + state.toString() + " for processor=" + processorId + ". Stopping the job coordinator.");
+ debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
+ return;
+ case SyncConnected:
+ LOG.info("Got syncconnected event for processor=" + processorId + ".");
+ debounceTimer.cancelAction(ZK_SESSION_ERROR);
+ return;
+ default:
+ // received SyncConnected, ConnectedReadOnly, and SaslAuthenticated. NoOp
+ LOG.info("Got ZK event " + state.toString() + " for processor=" + processorId + ". Continue");
}
}
@@ -416,7 +472,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
@Override
public void handleSessionEstablishmentError(Throwable error)
throws Exception {
- // this means we cannot connect to zookeeper
+ // this means we cannot connect to zookeeper to establish a session
LOG.info("handleSessionEstablishmentError received for processor=" + processorId, error);
debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d7a071b3/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
index 697833b..7f687d7 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
@@ -121,57 +121,4 @@ public class TestScheduleAfterDebounceTime {
Assert.assertEquals(RuntimeException.class, taskCallbackException[0].getClass());
scheduledQueue.stopScheduler();
}
-
- /**
- * Validates if the interrupted exception triggered by ExecutorService is handled by ScheduleAfterDebounceTime.
- */
- @Test
- public void testStopSchedulerInvokesRegisteredCallback() throws InterruptedException {
- final CountDownLatch hasTaskCallbackCompleted = new CountDownLatch(1);
- final CountDownLatch hasThreadStarted = new CountDownLatch(1);
- final CountDownLatch isSchedulerShutdownTriggered = new CountDownLatch(1);
-
- /**
- * Declaring this as an array to record the value inside the lambda.
- */
- final Throwable[] taskCallbackException = new Exception[1];
-
- ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
- scheduledQueue.setScheduledTaskCallback(throwable -> {
- /**
- * Assertion failures in callback doesn't fail the test.
- * Record the received exception here and assert outside
- * the callback.
- */
- taskCallbackException[0] = throwable;
- hasTaskCallbackCompleted.countDown();
- });
-
- scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME , () -> {
- hasThreadStarted.countDown();
- try {
- LOG.debug("Waiting for the scheduler shutdown trigger.");
- isSchedulerShutdownTriggered.await();
- } catch (InterruptedException e) {
- /**
- * Don't swallow the exception and restore the interrupt status.
- * Expect the ScheduleDebounceTime to handle this interrupt
- * and invoke ScheduledTaskCallback.
- */
- Thread.currentThread().interrupt();
- }
- });
-
- // Wait for the task to run.
- hasThreadStarted.await();
-
- // Shutdown the scheduler and update relevant state.
- scheduledQueue.stopScheduler();
- isSchedulerShutdownTriggered.countDown();
-
- hasTaskCallbackCompleted.await();
-
- // Assert on exception thrown.
- Assert.assertEquals(InterruptedException.class, taskCallbackException[0].getClass());
- }
}