You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/15 12:20:29 UTC
[flink] 01/04: [FLINK-13205][runtime] Make stop-with-savepoint
non-blocking on SourceStreamTask checkpoint injecting thread
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3aa68d52af710ad6c0574d5a90ee898f5e58efc9
Author: Aleksey Pak <al...@ververica.com>
AuthorDate: Mon Jul 8 17:11:46 2019 +0200
[FLINK-13205][runtime] Make stop-with-savepoint non-blocking on SourceStreamTask checkpoint injecting thread
---
.../flink/streaming/runtime/tasks/StreamTask.java | 29 ++---
.../runtime/tasks/SynchronousSavepointLatch.java | 55 ++++----
.../runtime/tasks/SourceTaskTerminationTest.java | 103 +++++----------
.../runtime/tasks/SynchronousCheckpointITCase.java | 27 +---
.../tasks/SynchronousSavepointSyncLatchTest.java | 144 +++++++++++----------
5 files changed, 148 insertions(+), 210 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 52e2011..7960a2f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -686,7 +686,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
CheckpointMetrics checkpointMetrics) throws Exception {
try {
- performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, false);
+ if (performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, false)) {
+ if (syncSavepointLatch.isSet()) {
+ syncSavepointLatch.blockUntilCheckpointIsAcknowledged();
+ }
+ }
}
catch (CancelTaskException e) {
LOG.info("Operator {} was cancelled while performing checkpoint {}.",
@@ -723,7 +727,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
final long checkpointId = checkpointMetaData.getCheckpointId();
- final boolean result;
synchronized (lock) {
if (isRunning) {
@@ -754,7 +757,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// impact progress of the streaming topology
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
- result = true;
+ return true;
}
else {
// we cannot perform our checkpoint - let the downstream operators know that they
@@ -779,21 +782,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
throw exception;
}
- result = false;
- }
- }
-
- if (isRunning && syncSavepointLatch.isSet()) {
-
- final boolean checkpointWasAcked =
- syncSavepointLatch.blockUntilCheckpointIsAcknowledged();
-
- if (checkpointWasAcked) {
- finishTask();
+ return false;
}
}
-
- return result;
}
public ExecutorService getAsyncOperationsThreadPool() {
@@ -802,6 +793,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ boolean success = false;
synchronized (lock) {
if (isRunning) {
LOG.debug("Notification of complete checkpoint for task {}", getName());
@@ -812,12 +804,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
- syncSavepointLatch.acknowledgeCheckpointAndTrigger(checkpointId);
+ success = true;
}
else {
LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
}
}
+ if (success) {
+ syncSavepointLatch.acknowledgeCheckpointAndTrigger(checkpointId, this::finishTask);
+ }
}
private void tryShutdownTimerService() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java
index 0a67dac..d10c19c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java
@@ -20,6 +20,10 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.util.function.RunnableWithException;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
/**
* A synchronization primitive used by the {@link StreamTask} to wait
@@ -29,12 +33,17 @@ class SynchronousSavepointLatch {
private static final long NOT_SET_CHECKPOINT_ID = -1L;
- // these are mutually exclusive
+ enum CompletionResult {
+ COMPLETED,
+ CANCELED,
+ }
+
+ @GuardedBy("synchronizationPoint")
private volatile boolean waiting;
- private volatile boolean completed;
- private volatile boolean canceled;
- private volatile boolean wasAlreadyCompleted;
+ @GuardedBy("synchronizationPoint")
+ @Nullable
+ private volatile CompletionResult completionResult;
private final Object synchronizationPoint;
@@ -44,8 +53,6 @@ class SynchronousSavepointLatch {
this.synchronizationPoint = new Object();
this.waiting = false;
- this.completed = false;
- this.canceled = false;
this.checkpointId = NOT_SET_CHECKPOINT_ID;
}
@@ -59,45 +66,38 @@ class SynchronousSavepointLatch {
}
}
- boolean blockUntilCheckpointIsAcknowledged() throws Exception {
+ void blockUntilCheckpointIsAcknowledged() throws Exception {
synchronized (synchronizationPoint) {
- if (isSet() && !isDone()) {
+ if (completionResult == null && isSet()) {
waiting = true;
synchronizationPoint.wait();
waiting = false;
}
-
- if (!isCanceled() && !wasAlreadyCompleted) {
- wasAlreadyCompleted = true;
- return true;
- }
-
- return false;
}
}
- void acknowledgeCheckpointAndTrigger(final long checkpointId) {
+ void acknowledgeCheckpointAndTrigger(final long checkpointId, RunnableWithException runnable) throws Exception {
synchronized (synchronizationPoint) {
- if (isSet() && !isDone() && this.checkpointId == checkpointId) {
- completed = true;
- synchronizationPoint.notifyAll();
+ if (completionResult == null && this.checkpointId == checkpointId) {
+ completionResult = CompletionResult.COMPLETED;
+ try {
+ runnable.run();
+ } finally {
+ synchronizationPoint.notifyAll();
+ }
}
}
}
void cancelCheckpointLatch() {
synchronized (synchronizationPoint) {
- if (!isDone()) {
- canceled = true;
+ if (completionResult == null) {
+ completionResult = CompletionResult.CANCELED;
synchronizationPoint.notifyAll();
}
}
}
- private boolean isDone () {
- return canceled || completed;
- }
-
@VisibleForTesting
boolean isWaiting() {
return waiting;
@@ -105,15 +105,14 @@ class SynchronousSavepointLatch {
@VisibleForTesting
boolean isCompleted() {
- return completed;
+ return completionResult == CompletionResult.COMPLETED;
}
@VisibleForTesting
boolean isCanceled() {
- return canceled;
+ return completionResult == CompletionResult.CANCELED;
}
- @VisibleForTesting
boolean isSet() {
return checkpointId != NOT_SET_CHECKPOINT_ID;
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
index 9295788..4a613d8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
-import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -33,62 +32,56 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
-import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
/**
* A test verifying the termination process
* (synchronous checkpoint and task termination) at the {@link SourceStreamTask}.
*/
-public class SourceTaskTerminationTest {
+public class SourceTaskTerminationTest extends TestLogger {
private static OneShotLatch ready;
private static MultiShotLatch runLoopStart;
private static MultiShotLatch runLoopEnd;
- private static AtomicReference<Throwable> error;
+ @Rule
+ public final Timeout timeoutPerTest = Timeout.seconds(20);
@Before
public void initialize() {
ready = new OneShotLatch();
runLoopStart = new MultiShotLatch();
runLoopEnd = new MultiShotLatch();
- error = new AtomicReference<>();
-
- error.set(null);
- }
-
- @After
- public void validate() {
- validateNoExceptionsWereThrown();
}
@Test
- public void terminateShouldBlockDuringCheckpointingAndEmitMaxWatermark() throws Exception {
+ public void testStopWithSavepointWithMaxWatermark() throws Exception {
stopWithSavepointStreamTaskTestHelper(true);
}
@Test
- public void suspendShouldBlockDuringCheckpointingAndNotEmitMaxWatermark() throws Exception {
+ public void testStopWithSavepointWithoutMaxWatermark() throws Exception {
stopWithSavepointStreamTaskTestHelper(false);
}
- private void stopWithSavepointStreamTaskTestHelper(final boolean expectMaxWatermark) throws Exception {
+ private void stopWithSavepointStreamTaskTestHelper(final boolean withMaxWatermark) throws Exception {
final long syncSavepointId = 34L;
final StreamTaskTestHarness<Long> srcTaskTestHarness = getSourceStreamTaskTestHarness();
final Thread executionThread = srcTaskTestHarness.invoke();
final StreamTask<Long, ?> srcTask = srcTaskTestHarness.getTask();
+ final SynchronousSavepointLatch syncSavepointLatch = srcTask.getSynchronousSavepointLatch();
ready.await();
@@ -96,15 +89,29 @@ public class SourceTaskTerminationTest {
emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 1L);
emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 2L);
- emitAndVerifyCheckpoint(srcTaskTestHarness, srcTask, 31L);
+ srcTask.triggerCheckpoint(
+ new CheckpointMetaData(31L, 900),
+ CheckpointOptions.forCheckpointWithDefaultLocation(),
+ false);
+
+ assertFalse(syncSavepointLatch.isSet());
+ assertFalse(syncSavepointLatch.isCompleted());
+ assertFalse(syncSavepointLatch.isWaiting());
+
+ verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), 31L);
emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 3L);
- final Thread syncSavepointThread = triggerSynchronousSavepointFromDifferentThread(srcTask, expectMaxWatermark, syncSavepointId);
+ srcTask.triggerCheckpoint(
+ new CheckpointMetaData(syncSavepointId, 900),
+ new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, CheckpointStorageLocationReference.getDefault()),
+ withMaxWatermark);
- final SynchronousSavepointLatch syncSavepointFuture = waitForSyncSavepointFutureToBeSet(srcTask);
+ assertTrue(syncSavepointLatch.isSet());
+ assertFalse(syncSavepointLatch.isCompleted());
+ assertFalse(syncSavepointLatch.isWaiting());
- if (expectMaxWatermark) {
+ if (withMaxWatermark) {
// if we are in TERMINATE mode, we expect the source task
// to emit MAX_WM before the SYNC_SAVEPOINT barrier.
verifyWatermark(srcTaskTestHarness.getOutput(), Watermark.MAX_WATERMARK);
@@ -112,54 +119,12 @@ public class SourceTaskTerminationTest {
verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), syncSavepointId);
- assertFalse(syncSavepointFuture.isCompleted());
- assertTrue(syncSavepointFuture.isWaiting());
-
srcTask.notifyCheckpointComplete(syncSavepointId);
- assertTrue(syncSavepointFuture.isCompleted());
+ assertTrue(syncSavepointLatch.isCompleted());
- syncSavepointThread.join();
executionThread.join();
}
- private void validateNoExceptionsWereThrown() {
- if (error.get() != null && !(error.get() instanceof CancelTaskException)) {
- fail(error.get().getMessage());
- }
- }
-
- private Thread triggerSynchronousSavepointFromDifferentThread(
- final StreamTask<Long, ?> task,
- final boolean advanceToEndOfEventTime,
- final long syncSavepointId) {
- final Thread checkpointingThread = new Thread(() -> {
- try {
- task.triggerCheckpoint(
- new CheckpointMetaData(syncSavepointId, 900),
- new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, CheckpointStorageLocationReference.getDefault()),
- advanceToEndOfEventTime);
- } catch (Exception e) {
- error.set(e);
- }
- });
- checkpointingThread.start();
-
- return checkpointingThread;
-
- }
-
- private void emitAndVerifyCheckpoint(
- final StreamTaskTestHarness<Long> srcTaskTestHarness,
- final StreamTask<Long, ?> srcTask,
- final long checkpointId) throws Exception {
-
- srcTask.triggerCheckpoint(
- new CheckpointMetaData(checkpointId, 900),
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false);
- verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), checkpointId);
- }
-
private StreamTaskTestHarness<Long> getSourceStreamTaskTestHarness() {
final StreamTaskTestHarness<Long> testHarness = new StreamTaskTestHarness<>(
SourceStreamTask::new,
@@ -177,16 +142,6 @@ public class SourceTaskTerminationTest {
return testHarness;
}
- private SynchronousSavepointLatch waitForSyncSavepointFutureToBeSet(final StreamTask streamTaskUnderTest) throws InterruptedException {
- final SynchronousSavepointLatch syncSavepointFuture = streamTaskUnderTest.getSynchronousSavepointLatch();
- while (!syncSavepointFuture.isWaiting()) {
- Thread.sleep(10L);
-
- validateNoExceptionsWereThrown();
- }
- return syncSavepointFuture;
- }
-
private void emitAndVerifyWatermarkAndElement(
final StreamTaskTestHarness<Long> srcTaskTestHarness,
final long expectedElement) throws InterruptedException {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index af09716..eee3a62 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.TransientBlobCache;
@@ -69,12 +68,10 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
-import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
-import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -89,8 +86,6 @@ import static org.mockito.Mockito.when;
*/
public class SynchronousCheckpointITCase {
- private static OneShotLatch checkpointTriggered = new OneShotLatch();
-
// A thread-safe queue to "log" and monitor events happening in the task's methods. Also, used by the test thread
// to synchronize actions with the task's threads.
private static LinkedBlockingQueue<Event> eventQueue = new LinkedBlockingQueue<>();
@@ -99,7 +94,7 @@ public class SynchronousCheckpointITCase {
public final Timeout timeoutPerTest = Timeout.seconds(10);
@Test
- public void taskCachedThreadPoolAllowsForSynchronousCheckpoints() throws Exception {
+ public void taskDispatcherThreadPoolAllowsForSynchronousCheckpoints() throws Exception {
final Task task = createTask(SynchronousCheckpointTestingTask.class);
try (TaskCleaner ignored = new TaskCleaner(task)) {
@@ -110,12 +105,6 @@ public class SynchronousCheckpointITCase {
assertEquals(ExecutionState.RUNNING, task.getExecutionState());
- // Hack: we are triggering a checkpoint with advanceToEndOfEventTime = true, to be sure that
- // triggerCheckpointBarrier has reached the sync checkpoint latch (by verifying in
- // SynchronousCheckpointTestingTask.advanceToEndOfEventTime) and only then proceeding to
- // notifyCheckpointComplete.
- // Without such synchronization, the notifyCheckpointComplete execution may be executed first and leave this
- // test in a deadlock.
task.triggerCheckpointBarrier(
42,
156865867234L,
@@ -123,16 +112,13 @@ public class SynchronousCheckpointITCase {
true);
assertThat(eventQueue.take(), is(Event.PRE_TRIGGER_CHECKPOINT));
+ assertThat(eventQueue.take(), is(Event.POST_TRIGGER_CHECKPOINT));
assertTrue(eventQueue.isEmpty());
- checkpointTriggered.await();
-
task.notifyCheckpointComplete(42);
assertThat(eventQueue.take(), is(Event.PRE_NOTIFY_CHECKPOINT_COMPLETE));
- assertThat(
- Arrays.asList(eventQueue.take(), eventQueue.take()),
- containsInAnyOrder(Event.POST_NOTIFY_CHECKPOINT_COMPLETE, Event.POST_TRIGGER_CHECKPOINT));
+ assertThat(eventQueue.take(), is(Event.POST_NOTIFY_CHECKPOINT_COMPLETE));
assertTrue(eventQueue.isEmpty());
assertEquals(ExecutionState.RUNNING, task.getExecutionState());
@@ -197,13 +183,6 @@ public class SynchronousCheckpointITCase {
protected void cleanup() {
}
-
- @Override
- protected void advanceToEndOfEventTime() throws Exception {
- // Wake up the test thread that we have actually entered the checkpoint invocation and the sync checkpoint
- // latch is set.
- checkpointTriggered.trigger();
- }
}
/**
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java
index fa2f9b7..59f5a81 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java
@@ -18,17 +18,20 @@
package org.apache.flink.streaming.runtime.tasks;
+import org.apache.flink.util.function.RunnableWithException;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -53,120 +56,127 @@ public class SynchronousSavepointSyncLatchTest {
}
@Test
- public void waitAndThenTriggerWorks() throws Exception {
+ public void triggerUnblocksWait() throws Exception {
final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
- final WaitingOnLatchCallable callable = new WaitingOnLatchCallable(latchUnderTest, 1L);
- executors.submit(callable);
+ latchUnderTest.setCheckpointId(1L);
+ assertFalse(latchUnderTest.isWaiting());
- while (!latchUnderTest.isSet()) {
+ Future<Void> future = runThreadWaitingForCheckpointAck(latchUnderTest);
+ while (!latchUnderTest.isWaiting()) {
Thread.sleep(5L);
}
+ final AtomicBoolean triggered = new AtomicBoolean();
+
// wrong checkpoint id.
- latchUnderTest.acknowledgeCheckpointAndTrigger(2L);
+ latchUnderTest.acknowledgeCheckpointAndTrigger(2L, () -> triggered.set(true));
+ assertFalse(triggered.get());
+ assertFalse(latchUnderTest.isCompleted());
assertTrue(latchUnderTest.isWaiting());
- latchUnderTest.acknowledgeCheckpointAndTrigger(1L);
+ latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> triggered.set(true));
+ assertTrue(triggered.get());
assertTrue(latchUnderTest.isCompleted());
+
+ future.get();
+ assertFalse(latchUnderTest.isWaiting());
}
@Test
- public void waitAndThenCancelWorks() throws Exception {
+ public void cancelUnblocksWait() throws Exception {
final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
- final WaitingOnLatchCallable callable = new WaitingOnLatchCallable(latchUnderTest, 1L);
- final Future<Boolean> resultFuture = executors.submit(callable);
+ latchUnderTest.setCheckpointId(1L);
+ assertFalse(latchUnderTest.isWaiting());
- while (!latchUnderTest.isSet()) {
+ Future<Void> future = runThreadWaitingForCheckpointAck(latchUnderTest);
+ while (!latchUnderTest.isWaiting()) {
Thread.sleep(5L);
}
latchUnderTest.cancelCheckpointLatch();
-
- boolean result = resultFuture.get();
-
- assertFalse(result);
assertTrue(latchUnderTest.isCanceled());
+
+ future.get();
+ assertFalse(latchUnderTest.isWaiting());
}
@Test
- public void triggeringReturnsTrueAtMostOnce() throws Exception {
+ public void waitAfterTriggerIsNotBlocking() throws Exception {
final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
- final WaitingOnLatchCallable firstCallable = new WaitingOnLatchCallable(latchUnderTest, 1L);
- final WaitingOnLatchCallable secondCallable = new WaitingOnLatchCallable(latchUnderTest, 1L);
-
- final Future<Boolean> firstFuture = executors.submit(firstCallable);
- final Future<Boolean> secondFuture = executors.submit(secondCallable);
+ latchUnderTest.setCheckpointId(1L);
+ latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> {});
- while (!latchUnderTest.isSet()) {
- Thread.sleep(5L);
- }
+ latchUnderTest.blockUntilCheckpointIsAcknowledged();
+ }
- latchUnderTest.acknowledgeCheckpointAndTrigger(1L);
+ @Test
+ public void waitAfterCancelIsNotBlocking() throws Exception {
+ final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
- final boolean firstResult = firstFuture.get();
- final boolean secondResult = secondFuture.get();
+ latchUnderTest.setCheckpointId(1L);
+ latchUnderTest.cancelCheckpointLatch();
+ assertTrue(latchUnderTest.isCanceled());
- // only one of the two can be true (it is a race so we do not know which one)
- assertTrue(firstResult ^ secondResult);
+ latchUnderTest.blockUntilCheckpointIsAcknowledged();
}
@Test
- public void waitAfterTriggerReturnsTrueImmediately() throws Exception {
+ public void triggeringInvokesCallbackAtMostOnce() throws Exception {
final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
+
latchUnderTest.setCheckpointId(1L);
- latchUnderTest.acknowledgeCheckpointAndTrigger(1L);
- final boolean triggerred = latchUnderTest.blockUntilCheckpointIsAcknowledged();
- assertTrue(triggerred);
+
+ AtomicInteger counter = new AtomicInteger();
+ Future<Void> future1 = runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet);
+ Future<Void> future2 = runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet);
+ Future<Void> future3 = runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet);
+ future1.get();
+ future2.get();
+ future3.get();
+
+ assertEquals(1, counter.get());
}
@Test
- public void waitAfterCancelDoesNothing() throws Exception {
+ public void triggeringAfterCancelDoesNotInvokeCallback() throws Exception {
final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
+
latchUnderTest.setCheckpointId(1L);
latchUnderTest.cancelCheckpointLatch();
- latchUnderTest.blockUntilCheckpointIsAcknowledged();
+ assertTrue(latchUnderTest.isCanceled());
+
+ final AtomicBoolean triggered = new AtomicBoolean();
+ latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> triggered.set(true));
+ assertFalse(triggered.get());
}
@Test
- public void checkpointIdIsSetOnlyOnce() throws InterruptedException {
+ public void checkpointIdIsSetOnlyOnce() {
final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
- final WaitingOnLatchCallable firstCallable = new WaitingOnLatchCallable(latchUnderTest, 1L);
- executors.submit(firstCallable);
-
- while (!latchUnderTest.isSet()) {
- Thread.sleep(5L);
- }
-
- final WaitingOnLatchCallable secondCallable = new WaitingOnLatchCallable(latchUnderTest, 2L);
- executors.submit(secondCallable);
-
- latchUnderTest.acknowledgeCheckpointAndTrigger(2L);
- assertTrue(latchUnderTest.isWaiting());
+ latchUnderTest.setCheckpointId(1L);
+ assertTrue(latchUnderTest.isSet());
+ assertEquals(1L, latchUnderTest.getCheckpointId());
- latchUnderTest.acknowledgeCheckpointAndTrigger(1L);
- assertTrue(latchUnderTest.isCompleted());
+ latchUnderTest.setCheckpointId(2L);
+ assertTrue(latchUnderTest.isSet());
+ assertEquals(1L, latchUnderTest.getCheckpointId());
}
- private static final class WaitingOnLatchCallable implements Callable<Boolean> {
-
- private final SynchronousSavepointLatch latch;
- private final long checkpointId;
-
- WaitingOnLatchCallable(
- final SynchronousSavepointLatch latch,
- final long checkpointId) {
- this.latch = checkNotNull(latch);
- this.checkpointId = checkpointId;
- }
+ private Future<Void> runThreadWaitingForCheckpointAck(SynchronousSavepointLatch latch) {
+ return executors.submit(() -> {
+ latch.blockUntilCheckpointIsAcknowledged();
+ return null;
+ });
+ }
- @Override
- public Boolean call() throws Exception {
- latch.setCheckpointId(checkpointId);
- return latch.blockUntilCheckpointIsAcknowledged();
- }
+ private Future<Void> runThreadTriggeringCheckpoint(SynchronousSavepointLatch latch, long checkpointId, RunnableWithException runnable) {
+ return executors.submit(() -> {
+ latch.acknowledgeCheckpointAndTrigger(checkpointId, runnable);
+ return null;
+ });
}
}