You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/09/15 04:19:23 UTC
samza git commit: SAMZA-1406: Fix potential orphaned containers
problem in stand alone
Repository: samza
Updated Branches:
refs/heads/0.14.0 ebce13e74 -> c45c7747a
SAMZA-1406: Fix potential orphaned containers problem in stand alone
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c45c7747
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c45c7747
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c45c7747
Branch: refs/heads/0.14.0
Commit: c45c7747ae371eee11e2f41dd4e32a53b12c6c91
Parents: ebce13e
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Thu Sep 14 21:17:36 2017 -0700
Committer: navina <na...@apache.org>
Committed: Thu Sep 14 21:18:55 2017 -0700
----------------------------------------------------------------------
.../samza/zk/ScheduleAfterDebounceTime.java | 172 +++++++++++++------
.../org/apache/samza/zk/ZkJobCoordinator.java | 26 ++-
.../samza/zk/TestScheduleAfterDebounceTime.java | 74 +++++++-
3 files changed, 204 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/c45c7747/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index 6174063..3a7dca9 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -20,99 +20,157 @@
package org.apache.samza.zk;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class allows scheduling a Runnable actions after some debounce time.
* When the same action is scheduled it needs to cancel the previous one. To accomplish that we keep the previous
- * future in a map, keyed by the action name. Here we predefine some actions, which are used in the
- * ZK based standalone app.
+ * future in a map, keyed by the action name.
*/
public class ScheduleAfterDebounceTime {
- public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
- public static final long TIMEOUT_MS = 1000 * 10; // timeout to wait for a task to complete
+ private static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
+ private static final String DEBOUNCE_THREAD_NAME_FORMAT = "debounce-thread-%d";
- // Here we predefine some actions which are used in the ZK based standalone app.
- // Action name when the JobModel version changes
- public static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange";
-
- // Action name when the Processor membership changes
- public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
+ // timeout to wait for a task to complete.
+ private static final int TIMEOUT_MS = 1000 * 10;
/**
- *
- * cleanup process is started after every new job model generation is complete.
- * It deletes old versions of job model and the barrier.
- * How many to delete (or to leave) is controlled by @see org.apache.samza.zk.ZkJobCoordinator#NUM_VERSIONS_TO_LEAVE.
- **/
- public static final String ON_ZK_CLEANUP = "OnCleanUp";
+ * {@link ScheduledTaskCallback} associated with the scheduler. OnError method of the
+ * callback will be invoked on first scheduled task failure.
+ */
+ private Optional<ScheduledTaskCallback> scheduledTaskCallback;
- private final ScheduledTaskFailureCallback scheduledTaskFailureCallback;
+ // Responsible for scheduling delayed actions.
+ private final ScheduledExecutorService scheduledExecutorService;
- private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat("debounce-thread-%d").setDaemon(true).build());
+ /**
+ * A map from actionName to {@link ScheduledFuture} of task scheduled for execution.
+ */
private final Map<String, ScheduledFuture> futureHandles = new HashMap<>();
- // Ideally, this should be only used for testing. But ZkBarrierForVersionUpgrades uses it. This needs to be fixed.
- // TODO: Timer shouldn't be passed around the components. It should be associated with the JC or the caller of
- // coordinationUtils.
public ScheduleAfterDebounceTime() {
- this.scheduledTaskFailureCallback = null;
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(DEBOUNCE_THREAD_NAME_FORMAT).setDaemon(true).build();
+ this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+ }
+
+ public void setScheduledTaskCallback(ScheduledTaskCallback scheduledTaskCallback) {
+ this.scheduledTaskCallback = Optional.ofNullable(scheduledTaskCallback);
}
- public ScheduleAfterDebounceTime(ScheduledTaskFailureCallback errorScheduledTaskFailureCallback) {
- this.scheduledTaskFailureCallback = errorScheduledTaskFailureCallback;
+ /**
+ * Performs the following operations in sequential order.
+ * <ul>
+ * <li> Makes best effort to cancel any existing task in task queue associated with the action.</li>
+ * <li> Schedules the incoming action for later execution and records its future.</li>
+ * </ul>
+ *
+ * @param actionName the name of scheduleable action.
+ * @param delayInMillis the time from now to delay execution.
+ * @param runnable the action to execute.
+ */
+ public synchronized void scheduleAfterDebounceTime(String actionName, long delayInMillis, Runnable runnable) {
+ // 1. Try to cancel any existing scheduled task associated with the action.
+ tryCancelScheduledAction(actionName);
+
+ // 2. Schedule the action.
+ ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(getScheduleableAction(actionName, runnable), delayInMillis, TimeUnit.MILLISECONDS);
+
+ LOG.info("Scheduled action: {} to run after: {} milliseconds.", actionName, delayInMillis);
+ futureHandles.put(actionName, scheduledFuture);
}
- synchronized public void scheduleAfterDebounceTime(String actionName, long debounceTimeMs, Runnable runnable) {
- // check if this action has been scheduled already
- ScheduledFuture sf = futureHandles.get(actionName);
- if (sf != null && !sf.isDone()) {
- LOG.info("cancel future for " + actionName);
- // attempt to cancel
- if (!sf.cancel(false)) {
+ /**
+ * Stops the scheduler. After this invocation no further schedule calls will be accepted
+ * and all pending enqueued tasks will be cancelled.
+ */
+ public synchronized void stopScheduler() {
+ scheduledExecutorService.shutdownNow();
+
+ // Clear the existing future handles.
+ futureHandles.clear();
+ }
+
+ /**
+ * Tries to cancel the task that belongs to {@code actionName} submitted to the queue.
+ *
+ * @param actionName the name of action to cancel.
+ */
+ private void tryCancelScheduledAction(String actionName) {
+ ScheduledFuture scheduledFuture = futureHandles.get(actionName);
+ if (scheduledFuture != null && !scheduledFuture.isDone()) {
+ LOG.info("Attempting to cancel the future of action: {}", actionName);
+ // Attempt to cancel
+ if (!scheduledFuture.cancel(false)) {
try {
- sf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ scheduledFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
// we ignore the exception
- LOG.warn("cancel for action " + actionName + " failed with ", e);
+ LOG.warn("Cancelling the future of action: {} failed.", actionName, e);
}
}
futureHandles.remove(actionName);
}
- // schedule a new task
- sf = scheduledExecutorService.schedule(() -> {
- try {
- runnable.run();
- LOG.debug(actionName + " completed successfully.");
- } catch (Throwable t) {
- LOG.error(actionName + " threw an exception.", t);
- if (scheduledTaskFailureCallback != null) {
- scheduledTaskFailureCallback.onError(t);
- }
+ }
+
+ /**
+ * Decorate the executable action with exception handlers to facilitate cleanup on failures.
+ *
+ * @param actionName the name of the scheduleable action.
+ * @param runnable the action to execute.
+ * @return the executable action decorated with exception handlers.
+ */
+ private Runnable getScheduleableAction(String actionName, Runnable runnable) {
+ return () -> {
+ try {
+ runnable.run();
+ /*
+ * Expects all run() implementations <b>not to swallow the interrupts.</b>
+ * This thread is interrupted from an external source(mostly executor service) to die.
+ */
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warn("Action: {} is interrupted.", actionName);
+ doCleanUpOnTaskException(new InterruptedException());
+ } else {
+ LOG.debug("Action: {} completed successfully.", actionName);
}
- },
- debounceTimeMs,
- TimeUnit.MILLISECONDS);
- LOG.info("scheduled " + actionName + " in " + debounceTimeMs);
- futureHandles.put(actionName, sf);
+ } catch (Exception exception) {
+ LOG.error("Execution of action: {} failed.", actionName, exception);
+ doCleanUpOnTaskException(exception);
+ }
+ };
}
- public void stopScheduler() {
- // shutdown executor service
- scheduledExecutorService.shutdown();
+ /**
+ * Handler method to invoke on a exception during an scheduled task execution and which
+ * the following operations in sequential order.
+ * <ul>
+ * <li> Stop the scheduler. If the task execution fails or a task is interrupted, scheduler will not accept/execute any new tasks.</li>
+ * <li> Invokes the onError handler method if taskCallback is defined.</li>
+ * </ul>
+ *
+ * @param exception the exception happened during task execution.
+ */
+ private void doCleanUpOnTaskException(Exception exception) {
+ stopScheduler();
+
+ scheduledTaskCallback.ifPresent(callback -> callback.onError(exception));
}
- interface ScheduledTaskFailureCallback {
+ /**
+ * A ScheduledTaskCallback::onError() is invoked on first occurrence of exception
+ * when executing a task. Provides plausible hook for handling failures
+ * in an asynchronous scheduled task execution.
+ */
+ interface ScheduledTaskCallback {
void onError(Throwable throwable);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c45c7747/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 9f64b3a..2b8349c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -61,6 +61,19 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
private static final int METADATA_CACHE_TTL_MS = 5000;
private static final int NUM_VERSIONS_TO_LEAVE = 10;
+ // Action name when the JobModel version changes
+ private static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange";
+
+ // Action name when the Processor membership changes
+ private static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
+
+ /**
+ * Cleanup process is started after every new job model generation is complete.
+ * It deletes old versions of job model and the barrier.
+ * How many to delete (or to leave) is controlled by @see org.apache.samza.zk.ZkJobCoordinator#NUM_VERSIONS_TO_LEAVE.
+ **/
+ private static final String ON_ZK_CLEANUP = "OnCleanUp";
+
private final ZkUtils zkUtils;
private final String processorId;
private final ZkController zkController;
@@ -95,7 +108,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
new ZkBarrierListenerImpl());
this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
- debounceTimer = new ScheduleAfterDebounceTime(throwable -> {
+ debounceTimer = new ScheduleAfterDebounceTime();
+ debounceTimer.setScheduledTaskCallback(throwable -> {
LOG.error("Received exception from in JobCoordinator Processing!", throwable);
stop();
});
@@ -157,7 +171,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
@Override
public void onProcessorChange(List<String> processors) {
LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
- debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, debounceTimeMs,
+ debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs,
() -> doOnProcessorChange(processors));
}
@@ -195,12 +209,12 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion);
- debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_ZK_CLEANUP, 0, () -> zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE));
+ debounceTimer.scheduleAfterDebounceTime(ON_ZK_CLEANUP, 0, () -> zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE));
}
@Override
public void onNewJobModelAvailable(final String version) {
- debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () ->
+ debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () ->
{
LOG.info("pid=" + processorId + "new JobModel available");
// get the new job model from ZK
@@ -273,7 +287,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
metrics.isLeader.set(true);
zkController.subscribeToProcessorChange();
- debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, debounceTimeMs, () ->
+ debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () ->
{
// actual actions to do are the same as onProcessorChange
doOnProcessorChange(new ArrayList<>());
@@ -312,7 +326,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
LOG.warn("Barrier for version " + version + " timed out.");
if (zkController.isLeader()) {
LOG.info("Leader will schedule a new job model generation");
- debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, debounceTimeMs, () ->
+ debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () ->
{
// actual actions to do are the same as onProcessorChange
doOnProcessorChange(new ArrayList<>());
http://git-wip-us.apache.org/repos/asf/samza/blob/c45c7747/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
index d3152be..a681767 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
@@ -19,15 +19,23 @@
package org.apache.samza.zk;
-import org.junit.Assert;
-import org.junit.Test;
-
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestScheduleAfterDebounceTime {
+ private static final Logger LOG = LoggerFactory.getLogger(TestScheduleAfterDebounceTime.class);
+
private static final long WAIT_TIME = 500;
+ @Rule
+ public Timeout testTimeOutInSeconds = new Timeout(10, TimeUnit.SECONDS);
+
class TestObj {
private volatile int i = 0;
public void inc() {
@@ -91,8 +99,10 @@ public class TestScheduleAfterDebounceTime {
@Test
public void testRunnableWithExceptionInvokesCallback() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
- ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(e -> {
- Assert.assertEquals(RuntimeException.class, e.getClass());
+ final Throwable[] taskCallbackException = new Exception[1];
+ ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
+ scheduledQueue.setScheduledTaskCallback(throwable -> {
+ taskCallbackException[0] = throwable;
latch.countDown();
});
@@ -107,6 +117,60 @@ public class TestScheduleAfterDebounceTime {
boolean result = latch.await(5 * WAIT_TIME, TimeUnit.MILLISECONDS);
Assert.assertTrue("Latch timed-out.", result);
Assert.assertEquals(0, testObj.get());
+ Assert.assertEquals(RuntimeException.class, taskCallbackException[0].getClass());
+ scheduledQueue.stopScheduler();
+ }
+
+ /**
+ * Validates if the interrupted exception triggered by ExecutorService is handled by ScheduleAfterDebounceTime.
+ */
+ @Test
+ public void testStopSchedulerInvokesRegisteredCallback() throws InterruptedException {
+ final CountDownLatch hasTaskCallbackCompleted = new CountDownLatch(1);
+ final CountDownLatch hasThreadStarted = new CountDownLatch(1);
+ final CountDownLatch isSchedulerShutdownTriggered = new CountDownLatch(1);
+
+ /**
+ * Declaring this as an array to record the value inside the lambda.
+ */
+ final Throwable[] taskCallbackException = new Exception[1];
+
+ ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
+ scheduledQueue.setScheduledTaskCallback(throwable -> {
+ /**
+ * Assertion failures in callback doesn't fail the test.
+ * Record the received exception here and assert outside
+ * the callback.
+ */
+ taskCallbackException[0] = throwable;
+ hasTaskCallbackCompleted.countDown();
+ });
+
+ scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME , () -> {
+ hasThreadStarted.countDown();
+ try {
+ LOG.debug("Waiting for the scheduler shutdown trigger.");
+ isSchedulerShutdownTriggered.await();
+ } catch (InterruptedException e) {
+ /**
+ * Don't swallow the exception and restore the interrupt status.
+ * Expect the ScheduleDebounceTime to handle this interrupt
+ * and invoke ScheduledTaskCallback.
+ */
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ // Wait for the task to run.
+ hasThreadStarted.await();
+
+ // Shutdown the scheduler and update relevant state.
scheduledQueue.stopScheduler();
+ isSchedulerShutdownTriggered.countDown();
+
+ hasTaskCallbackCompleted.await();
+
+ // Assert on exception thrown.
+ Assert.assertEquals(InterruptedException.class, taskCallbackException[0].getClass());
}
}