You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/30 21:15:10 UTC
[flink] 07/11: [FLINK-16986][coordination] (part 2) Make
OperatorCoordinatorHolder driven by main thread executor
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 37f7db38290df065669178cc6407edd5055f1951
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu May 28 18:19:30 2020 +0200
[FLINK-16986][coordination] (part 2) Make OperatorCoordinatorHolder driven by main thread executor
---
.../coordination/OperatorCoordinatorHolder.java | 132 +++--
.../operators/coordination/OperatorEventValve.java | 62 ++-
.../flink/runtime/scheduler/SchedulerBase.java | 2 +-
.../OperatorCoordinatorHolderTest.java | 556 +++++++++++++++++++++
.../coordination/OperatorEventValveTest.java | 46 +-
5 files changed, 725 insertions(+), 73 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 304eb20..9e68c7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -20,11 +20,13 @@ package org.apache.flink.runtime.operators.coordination;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedValue;
@@ -57,19 +59,16 @@ import static org.apache.flink.util.Preconditions.checkState;
*/
public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorCoordinatorCheckpointContext {
- private static final long NO_CHECKPOINT = Long.MIN_VALUE;
-
private final OperatorCoordinator coordinator;
private final OperatorID operatorId;
private final LazyInitializedCoordinatorContext context;
-
private final OperatorEventValve eventValve;
- // these two fields are needed for the construction of OperatorStateHandles when taking checkpoints
private final int operatorParallelism;
private final int operatorMaxParallelism;
- private volatile long currentlyTriggeredCheckpoint;
+ private Consumer<Throwable> globalFailureHandler;
+ private ComponentMainThreadExecutor mainThreadExecutor;
private OperatorCoordinatorHolder(
final OperatorID operatorId,
@@ -85,17 +84,17 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
this.eventValve = checkNotNull(eventValve);
this.operatorParallelism = operatorParallelism;
this.operatorMaxParallelism = operatorMaxParallelism;
-
- this.currentlyTriggeredCheckpoint = NO_CHECKPOINT;
}
- public void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
- lazyInitialize(scheduler::handleGlobalFailure, schedulerExecutor);
+ public void lazyInitialize(SchedulerNG scheduler, ComponentMainThreadExecutor mainThreadExecutor) {
+ lazyInitialize(scheduler::handleGlobalFailure, mainThreadExecutor);
}
@VisibleForTesting
- void lazyInitialize(Consumer<Throwable> globalFailureHandler, Executor schedulerExecutor) {
- context.lazyInitialize(globalFailureHandler, schedulerExecutor);
+ void lazyInitialize(Consumer<Throwable> globalFailureHandler, ComponentMainThreadExecutor mainThreadExecutor) {
+ this.globalFailureHandler = globalFailureHandler;
+ this.mainThreadExecutor = mainThreadExecutor;
+ context.lazyInitialize(globalFailureHandler, mainThreadExecutor);
}
// ------------------------------------------------------------------------
@@ -127,6 +126,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
@Override
public void start() throws Exception {
+ mainThreadExecutor.assertRunningInMainThread();
checkState(context.isInitialized(), "Coordinator Context is not yet initialized");
coordinator.start();
}
@@ -139,78 +139,112 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
@Override
public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
+ mainThreadExecutor.assertRunningInMainThread();
coordinator.handleEventFromOperator(subtask, event);
}
@Override
public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+ mainThreadExecutor.assertRunningInMainThread();
coordinator.subtaskFailed(subtask, reason);
eventValve.resetForTask(subtask);
}
@Override
- public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
- setCurrentlyTriggeredCheckpoint(checkpointId);
-
- final CompletableFuture<byte[]> checkpointFuture = coordinator.checkpointCoordinator(checkpointId);
-
- // synchronously!!!, with the completion, we need to shut the event valve
- checkpointFuture.whenComplete((ignored, failure) -> {
- if (failure != null) {
- abortCurrentTriggering();
- } else {
- onCheckpointStateFutureComplete(checkpointId);
- }
- });
-
- return checkpointFuture;
+ public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
+ // unfortunately, this method does not run in the scheduler executor, but in the
+ // checkpoint coordinator time thread.
+ // we can remove the delegation once the checkpoint coordinator runs fully in the scheduler's
+ // main thread executor
+ final CompletableFuture<byte[]> future = new CompletableFuture<>();
+ mainThreadExecutor.execute(() -> checkpointCoordinatorInternal(checkpointId, future));
+ return future;
}
@Override
public void checkpointComplete(long checkpointId) {
- coordinator.checkpointComplete(checkpointId);
+ // unfortunately, this method does not run in the scheduler executor, but in the
+ // checkpoint coordinator time thread.
+ // we can remove the delegation once the checkpoint coordinator runs fully in the scheduler's
+ // main thread executor
+ mainThreadExecutor.execute(() -> checkpointCompleteInternal(checkpointId));
}
@Override
public void resetToCheckpoint(byte[] checkpointData) throws Exception {
- resetCheckpointTriggeringCheck();
+ // ideally we would like to check this here, however this method is called early during
+ // execution graph construction, before the main thread executor is set
+
eventValve.reset();
coordinator.resetToCheckpoint(checkpointData);
}
+ private void checkpointCompleteInternal(long checkpointId) {
+ mainThreadExecutor.assertRunningInMainThread();
+ coordinator.checkpointComplete(checkpointId);
+ }
+
+ private void checkpointCoordinatorInternal(final long checkpointId, final CompletableFuture<byte[]> result) {
+ mainThreadExecutor.assertRunningInMainThread();
+
+ final CompletableFuture<byte[]> checkpointFuture;
+ try {
+ eventValve.markForCheckpoint(checkpointId);
+ checkpointFuture = coordinator.checkpointCoordinator(checkpointId);
+ } catch (Throwable t) {
+ ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+ result.completeExceptionally(t);
+ globalFailureHandler.accept(t);
+ return;
+ }
+
+ // synchronously!!!, with the completion, we need to shut the event valve
+ checkpointFuture.whenComplete((success, failure) -> {
+ if (failure != null) {
+ result.completeExceptionally(failure);
+ } else {
+ try {
+ eventValve.shutValve(checkpointId);
+ result.complete(success);
+ } catch (Exception e) {
+ result.completeExceptionally(e);
+ }
+ }
+ });
+ }
+
// ------------------------------------------------------------------------
// Checkpointing Callbacks
// ------------------------------------------------------------------------
@Override
public void afterSourceBarrierInjection(long checkpointId) {
- verifyNoOtherCheckpointBeingTriggered(checkpointId);
- eventValve.openValve();
- resetCheckpointTriggeringCheck();
- }
+ // this method is commonly called by the CheckpointCoordinator's executor thread (timer thread).
- @Override
- public void abortCurrentTriggering() {
- eventValve.openValve();
- resetCheckpointTriggeringCheck();
- }
+ // we ideally want the scheduler main-thread to be the one that sends the blocked events
+ // however, we need to react synchronously here, to maintain consistency and not allow
+ // another checkpoint injection in-between (unlikely, but possible).
+ // fortunately, the event-sending goes pretty much directly to the RPC gateways, which are
+ // thread safe.
- void onCheckpointStateFutureComplete(long checkpointId) {
- verifyNoOtherCheckpointBeingTriggered(checkpointId);
- eventValve.shutValve();
+ // this will automatically be fixed once the checkpoint coordinator runs in the
+ // scheduler's main thread executor
+ eventValve.openValveAndUnmarkCheckpoint();
}
- private void verifyNoOtherCheckpointBeingTriggered(long checkpointId) {
- checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || currentlyTriggeredCheckpoint == checkpointId);
- }
+ @Override
+ public void abortCurrentTriggering() {
+ // this method is commonly called by the CheckpointCoordinator's executor thread (timer thread).
- private void setCurrentlyTriggeredCheckpoint(long checkpointId) {
- verifyNoOtherCheckpointBeingTriggered(checkpointId);
- currentlyTriggeredCheckpoint = checkpointId;
- }
+ // we ideally want the scheduler main-thread to be the one that sends the blocked events
+ // however, we need to react synchronously here, to maintain consistency and not allow
+ // another checkpoint injection in-between (unlikely, but possible).
+ // fortunately, the event-sending goes pretty much directly to the RPC gateways, which are
+ // thread safe.
- private void resetCheckpointTriggeringCheck() {
- currentlyTriggeredCheckpoint = NO_CHECKPOINT;
+ // this will automatically be fixed once the checkpoint coordinator runs in the
+ // scheduler's main thread executor
+ eventValve.openValveAndUnmarkCheckpoint();
}
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
index bc1bfcf..f63df4a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.operators.coordination;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
@@ -41,7 +42,9 @@ import java.util.function.BiFunction;
*
* <p>This class is fully thread safe, under the assumption that the event sender is thread-safe.
*/
-public final class OperatorEventValve {
+final class OperatorEventValve {
+
+ private static final long NO_CHECKPOINT = Long.MIN_VALUE;
private final Object lock = new Object();
@@ -52,6 +55,12 @@ public final class OperatorEventValve {
private final Map<Integer, List<BlockedEvent>> blockedEvents = new LinkedHashMap<>();
@GuardedBy("lock")
+ private long currentCheckpointId;
+
+ @GuardedBy("lock")
+ private long lastCheckpointId;
+
+ @GuardedBy("lock")
private boolean shut;
/**
@@ -60,6 +69,8 @@ public final class OperatorEventValve {
*/
public OperatorEventValve(BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender) {
this.eventSender = eventSender;
+ this.currentCheckpointId = NO_CHECKPOINT;
+ this.lastCheckpointId = Long.MIN_VALUE;
}
// ------------------------------------------------------------------------
@@ -96,21 +107,52 @@ public final class OperatorEventValve {
* Shuts the value. All events sent through this valve are blocked until the valve is re-opened.
* If the valve is already shut, this does nothing.
*/
- public void shutValve() {
- // synchronized block for visibility
+ public void markForCheckpoint(long checkpointId) {
synchronized (lock) {
- shut = true;
+ if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != checkpointId) {
+ throw new IllegalStateException(String.format(
+ "Cannot mark for checkpoint %d, already marked for checkpoint %d",
+ checkpointId, currentCheckpointId));
+ }
+ if (checkpointId > lastCheckpointId) {
+ currentCheckpointId = checkpointId;
+ lastCheckpointId = checkpointId;
+ } else {
+ throw new IllegalStateException(String.format(
+ "Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d",
+ lastCheckpointId, checkpointId));
+ }
+ }
+ }
+
+ /**
+ * Shuts the value. All events sent through this valve are blocked until the valve is re-opened.
+ * If the valve is already shut, this does nothing.
+ */
+ public void shutValve(long checkpointId) {
+ synchronized (lock) {
+ if (checkpointId == currentCheckpointId) {
+ shut = true;
+ } else {
+ throw new IllegalStateException(String.format(
+ "Cannot shut valve for non-prepared checkpoint. " +
+ "Prepared checkpoint = %s, attempting-to-close checkpoint = %d",
+ (currentCheckpointId == NO_CHECKPOINT ? "(none)" : String.valueOf(currentCheckpointId)),
+ checkpointId));
+ }
}
}
/**
* Opens the value, releasing all buffered events.
*/
- public void openValve() {
+ public void openValveAndUnmarkCheckpoint() {
final ArrayList<FuturePair> futures;
// send all events under lock, so that no new event can sneak between
synchronized (lock) {
+ currentCheckpointId = NO_CHECKPOINT;
+
if (!shut) {
return;
}
@@ -129,14 +171,7 @@ public final class OperatorEventValve {
// apply the logic on the future outside the lock, to be safe
for (FuturePair pair : futures) {
- final CompletableFuture<Acknowledge> originalFuture = pair.originalFuture;
- pair.ackFuture.whenComplete((success, failure) -> {
- if (failure != null) {
- originalFuture.completeExceptionally(failure);
- } else {
- originalFuture.complete(success);
- }
- });
+ FutureUtils.forward(pair.ackFuture, pair.originalFuture);
}
}
@@ -165,6 +200,7 @@ public final class OperatorEventValve {
}
blockedEvents.clear();
shut = false;
+ currentCheckpointId = NO_CHECKPOINT;
}
failAllFutures(events);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index ce1d8a1..1a10d6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -970,7 +970,7 @@ public abstract class SchedulerBase implements SchedulerNG {
}
}
- private void initializeOperatorCoordinators(Executor mainThreadExecutor) {
+ private void initializeOperatorCoordinators(ComponentMainThreadExecutor mainThreadExecutor) {
for (OperatorCoordinatorHolder coordinatorHolder : getAllCoordinators()) {
coordinatorHolder.lazyInitialize(this, mainThreadExecutor);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
new file mode 100644
index 0000000..fa4e7cf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -0,0 +1,556 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.TestEventSender.EventWithSubtask;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * A test that ensures the before/after conditions around event sending and checkpoint are met.
+ * concurrency
+ */
+@SuppressWarnings("serial")
+public class OperatorCoordinatorHolderTest extends TestLogger {
+
+ private final Consumer<Throwable> globalFailureHandler = (t) -> globalFailure = t;
+ private Throwable globalFailure;
+
+ @After
+ public void checkNoGlobalFailure() throws Exception {
+ if (globalFailure != null) {
+ ExceptionUtils.rethrowException(globalFailure);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void checkpointFutureInitiallyNotDone() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+ final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(1L);
+ assertFalse(checkpointFuture.isDone());
+ }
+
+ @Test
+ public void completedCheckpointFuture() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+ final byte[] testData = new byte[] {11, 22, 33, 44};
+
+ final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(9L);
+ getCoordinator(holder).getLastTriggeredCheckpoint().complete(testData);
+
+ assertTrue(checkpointFuture.isDone());
+ assertArrayEquals(testData, checkpointFuture.get());
+ }
+
+ @Test
+ public void eventsBeforeCheckpointFutureCompletionPassThrough() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+ holder.checkpointCoordinator(1L);
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 1);
+
+ assertThat(sender.events, contains(
+ new EventWithSubtask(new TestOperatorEvent(1), 1)
+ ));
+ }
+
+ @Test
+ public void eventsAreBlockedAfterCheckpointFutureCompletes() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+ triggerAndCompleteCheckpoint(holder, 10L);
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1337), 0);
+
+ assertTrue(sender.events.isEmpty());
+ }
+
+ @Test
+ public void abortedCheckpointReleasesBlockedEvents() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+ triggerAndCompleteCheckpoint(holder, 123L);
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1337), 0);
+ holder.abortCurrentTriggering();
+
+ assertThat(sender.events, contains(
+ new EventWithSubtask(new TestOperatorEvent(1337), 0)
+ ));
+ }
+
+ @Test
+ public void sourceBarrierInjectionReleasesBlockedEvents() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+ triggerAndCompleteCheckpoint(holder, 1111L);
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1337), 0);
+ holder.afterSourceBarrierInjection(1111L);
+
+ assertThat(sender.events, contains(
+ new EventWithSubtask(new TestOperatorEvent(1337), 0)
+ ));
+ }
+
+ @Test
+ public void failedTasksDropsBlockedEvents() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+ triggerAndCompleteCheckpoint(holder, 1000L);
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0);
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 1);
+ holder.subtaskFailed(1, null);
+ holder.abortCurrentTriggering();
+
+ assertThat(sender.events, contains(
+ new EventWithSubtask(new TestOperatorEvent(0), 0)
+ ));
+ }
+
+ @Test
+ public void restoreOpensValveEvents() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+ triggerAndCompleteCheckpoint(holder, 1000L);
+ holder.resetToCheckpoint(new byte[0]);
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(999), 1);
+
+ assertThat(sender.events, contains(
+ new EventWithSubtask(new TestOperatorEvent(999), 1)
+ ));
+ }
+
+ @Test
+ public void restoreDropsBlockedEvents() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+ triggerAndCompleteCheckpoint(holder, 1000L);
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0);
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 1);
+ holder.resetToCheckpoint(new byte[0]);
+
+ assertTrue(sender.events.isEmpty());
+ }
+
+ @Test
+ public void lateCompleteCheckpointFutureDoesNotBlockEvents() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+ final CompletableFuture<byte[]> holderFuture = holder.checkpointCoordinator(1000L);
+ final CompletableFuture<byte[]> future1 = getCoordinator(holder).getLastTriggeredCheckpoint();
+ holder.abortCurrentTriggering();
+
+ triggerAndCompleteCheckpoint(holder, 1010L);
+ holder.afterSourceBarrierInjection(1010L);
+
+ future1.complete(new byte[0]);
+
+ assertTrue(holderFuture.isCompletedExceptionally());
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(123), 0);
+
+ assertThat(sender.events, contains(
+ new EventWithSubtask(new TestOperatorEvent(123), 0)
+ ));
+ }
+
+ @Test
+ public void triggeringFailsIfOtherTriggeringInProgress() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+ holder.checkpointCoordinator(11L);
+ final CompletableFuture<?> future = holder.checkpointCoordinator(12L);
+
+ assertTrue(future.isCompletedExceptionally());
+ assertNotNull(globalFailure);
+ globalFailure = null;
+ }
+
+ @Test
+ public void takeCheckpointAfterSuccessfulCheckpoint() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0);
+
+ triggerAndCompleteCheckpoint(holder, 22L);
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 0);
+ holder.afterSourceBarrierInjection(22L);
+
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(2), 0);
+
+ triggerAndCompleteCheckpoint(holder, 23L);
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(3), 0);
+ holder.afterSourceBarrierInjection(23L);
+
+ assertThat(sender.events, contains(
+ new EventWithSubtask(new TestOperatorEvent(0), 0),
+ new EventWithSubtask(new TestOperatorEvent(1), 0),
+ new EventWithSubtask(new TestOperatorEvent(2), 0),
+ new EventWithSubtask(new TestOperatorEvent(3), 0)
+ ));
+ }
+
+ @Test
+ public void takeCheckpointAfterAbortedCheckpoint() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0);
+
+ triggerAndCompleteCheckpoint(holder, 22L);
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 0);
+ holder.abortCurrentTriggering();
+
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(2), 0);
+
+ triggerAndCompleteCheckpoint(holder, 23L);
+ getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(3), 0);
+ holder.afterSourceBarrierInjection(23L);
+
+ assertThat(sender.events, contains(
+ new EventWithSubtask(new TestOperatorEvent(0), 0),
+ new EventWithSubtask(new TestOperatorEvent(1), 0),
+ new EventWithSubtask(new TestOperatorEvent(2), 0),
+ new EventWithSubtask(new TestOperatorEvent(3), 0)
+ ));
+ }
+
+ /**
+ * This test verifies that the order of Checkpoint Completion and Event Sending observed from the
+ * outside matches that from within the OperatorCoordinator.
+ *
+ * <p>Extreme case 1: The coordinator immediately completes the checkpoint future and sends an
+ * event directly after that.
+ */
+ @Test
+ public void verifyCheckpointEventOrderWhenCheckpointFutureCompletedImmediately() throws Exception {
+ checkpointEventValueAtomicity(FutureCompletedInstantlyTestCoordinator::new);
+ }
+
+ /**
+ * This test verifies that the order of Checkpoint Completion and Event Sending observed from the
+ * outside matches that from within the OperatorCoordinator.
+ *
+ * <p>Extreme case 2: After the checkpoint triggering, the coordinator flushes a bunch of events
+ * before completing the checkpoint future.
+ */
+ @Test
+ public void verifyCheckpointEventOrderWhenCheckpointFutureCompletesLate() throws Exception {
+ checkpointEventValueAtomicity(FutureCompletedAfterSendingEventsCoordinator::new);
+ }
+
+ private void checkpointEventValueAtomicity(
+ final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor) throws Exception {
+
+ final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
+ final ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(
+ (ScheduledExecutorService) executor, Thread.currentThread());
+
+ final TestEventSender sender = new TestEventSender();
+ final OperatorCoordinatorHolder holder = createCoordinatorHolder(
+ sender, coordinatorCtor, mainThreadExecutor);
+
+ // give the coordinator some time to emit some events
+ Thread.sleep(new Random().nextInt(10) + 20);
+ executor.triggerAll();
+
+ // trigger the checkpoint - this should also shut the valve as soon as the future is completed
+ final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(0L);
+ executor.triggerAll();
+
+ // give the coordinator some time to emit some events
+ Thread.sleep(new Random().nextInt(10) + 10);
+ holder.close();
+ executor.triggerAll();
+
+ assertTrue(checkpointFuture.isDone());
+ final int checkpointedNumber = bytesToInt(checkpointFuture.get());
+
+ assertEquals(checkpointedNumber, sender.events.size());
+ for (int i = 0; i < checkpointedNumber; i++) {
+ assertEquals(i, ((TestOperatorEvent) sender.events.get(i).event).getValue());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // test actions
+ // ------------------------------------------------------------------------
+
+ private CompletableFuture<byte[]> triggerAndCompleteCheckpoint(
+ OperatorCoordinatorHolder holder,
+ long checkpointId) throws Exception {
+
+ final CompletableFuture<byte[]> future = holder.checkpointCoordinator(checkpointId);
+ getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]);
+ return future;
+ }
+
+ // ------------------------------------------------------------------------
+ // miscellaneous helpers
+ // ------------------------------------------------------------------------
+
+ static byte[] intToBytes(int value) {
+ return ByteBuffer.allocate(4).putInt(value).array();
+ }
+
+ static int bytesToInt(byte[] bytes) {
+ return ByteBuffer.wrap(bytes).getInt();
+ }
+
+ private static TestingOperatorCoordinator getCoordinator(OperatorCoordinatorHolder holder) {
+ return (TestingOperatorCoordinator) holder.coordinator();
+ }
+
+ private OperatorCoordinatorHolder createCoordinatorHolder(
+ final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender,
+ final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor) throws Exception {
+
+ return createCoordinatorHolder(
+ eventSender,
+ coordinatorCtor,
+ ComponentMainThreadExecutorServiceAdapter.forMainThread());
+ }
+
+ private OperatorCoordinatorHolder createCoordinatorHolder(
+ final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender,
+ final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor,
+ final ComponentMainThreadExecutor mainThreadExecutor) throws Exception {
+
+ final OperatorID opId = new OperatorID();
+ final OperatorCoordinator.Provider provider = new OperatorCoordinator.Provider() {
+ @Override
+ public OperatorID getOperatorId() {
+ return opId;
+ }
+
+ @Override
+ public OperatorCoordinator create(OperatorCoordinator.Context context) {
+ return coordinatorCtor.apply(context);
+ }
+ };
+
+ final OperatorCoordinatorHolder holder = OperatorCoordinatorHolder.create(
+ opId,
+ provider,
+ eventSender,
+ "test-coordinator-name",
+ 3,
+ 1775);
+
+ holder.lazyInitialize(globalFailureHandler, mainThreadExecutor);
+ holder.start();
+
+ return holder;
+ }
+
+ // ------------------------------------------------------------------------
+ // test implementations
+ // ------------------------------------------------------------------------
+
+ private static final class FutureCompletedInstantlyTestCoordinator extends CheckpointEventOrderTestBaseCoordinator {
+
+ private final ReentrantLock lock = new ReentrantLock(true);
+ private final Condition condition = lock.newCondition();
+
+ @Nullable
+ @GuardedBy("lock")
+ private CompletableFuture<byte[]> checkpoint;
+
+ private int num;
+
+ FutureCompletedInstantlyTestCoordinator(Context context) {
+ super(context);
+ }
+
+ @Override
+ public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+ // we create the checkpoint future, but before returning it, we wait on a
+ // condition. that way, we simulate a "context switch" just at the time when the
+ // future would be returned and make the other thread complete the future and send an
+ // event before this method returns
+ final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+ lock.lock();
+ try {
+ checkpoint = checkpointFuture;
+ condition.await();
+ return checkpointFuture;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ protected void step() throws Exception {
+ lock.lock();
+ try {
+ // if there is a checkpoint to complete, we complete it and immediately
+ // try to send another event, without releasing the lock. that way we
+ // force the situation as if the checkpoint get completed and an event gets
+ // sent while the triggering thread is stalled
+ if (checkpoint != null) {
+ checkpoint.complete(intToBytes(num));
+ checkpoint = null;
+ }
+ context.sendEvent(new TestOperatorEvent(num++), 0);
+ condition.signalAll();
+ } finally {
+ lock.unlock();
+ }
+
+ Thread.sleep(2);
+ }
+ }
+
+ private static final class FutureCompletedAfterSendingEventsCoordinator extends CheckpointEventOrderTestBaseCoordinator {
+
+ @Nullable
+ private CompletableFuture<byte[]> checkpoint;
+
+ private int num;
+
+ FutureCompletedAfterSendingEventsCoordinator(Context context) {
+ super(context);
+ }
+
+ @Override
+ public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+ final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+ checkpoint = checkpointFuture;
+ return checkpointFuture;
+ }
+
+ @Override
+ protected void step() throws Exception {
+ Thread.sleep(2);
+
+ context.sendEvent(new TestOperatorEvent(num++), 0);
+ context.sendEvent(new TestOperatorEvent(num++), 1);
+ context.sendEvent(new TestOperatorEvent(num++), 2);
+
+ if (checkpoint != null) {
+ checkpoint.complete(intToBytes(num));
+ checkpoint = null;
+ }
+ }
+ }
+
+ private abstract static class CheckpointEventOrderTestBaseCoordinator implements OperatorCoordinator, Runnable {
+
+ private final Thread coordinatorThread;
+
+ protected final Context context;
+
+ private volatile boolean closed;
+
+ CheckpointEventOrderTestBaseCoordinator(Context context) {
+ this.context = context;
+ this.coordinatorThread = new Thread(this);
+ }
+
+ @Override
+ public void start() throws Exception {
+ coordinatorThread.start();
+ }
+
+ @Override
+ public void close() throws Exception {
+ closed = true;
+ coordinatorThread.interrupt();
+ coordinatorThread.join();
+ }
+
+ @Override
+ public void handleEventFromOperator(int subtask, OperatorEvent event){}
+
+ @Override
+ public void subtaskFailed(int subtask, @Nullable Throwable reason) {}
+
+ @Override
+ public abstract CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception;
+
+ @Override
+ public void checkpointComplete(long checkpointId) {}
+
+ @Override
+ public void resetToCheckpoint(byte[] checkpointData) throws Exception {}
+
+ @Override
+ public void run() {
+ try {
+ while (!closed) {
+ step();
+ }
+ } catch (Throwable t) {
+ if (closed) {
+ return;
+ }
+
+ // this should never happen, but just in case, print and crash the test
+ //noinspection CallToPrintStackTrace
+ t.printStackTrace();
+ System.exit(-1);
+ }
+ }
+
+ protected abstract void step() throws Exception;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
index 9b0325d..a380958 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
@@ -50,11 +50,30 @@ public class OperatorEventValveTest {
assertTrue(future.isDone());
}
+ @Test(expected = IllegalStateException.class)
+ public void errorShuttingUnmarkedValve() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorEventValve valve = new OperatorEventValve(sender);
+
+ valve.shutValve(123L);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void errorShuttingValveForOtherMark() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorEventValve valve = new OperatorEventValve(sender);
+
+ valve.markForCheckpoint(100L);
+ valve.shutValve(123L);
+ }
+
@Test
public void eventsBlockedByClosedValve() throws Exception {
final TestEventSender sender = new TestEventSender();
final OperatorEventValve valve = new OperatorEventValve(sender);
- valve.shutValve();
+
+ valve.markForCheckpoint(1L);
+ valve.shutValve(1L);
final CompletableFuture<Acknowledge> future =
valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 1);
@@ -67,14 +86,16 @@ public class OperatorEventValveTest {
public void eventsReleasedAfterOpeningValve() throws Exception {
final TestEventSender sender = new TestEventSender();
final OperatorEventValve valve = new OperatorEventValve(sender);
- valve.shutValve();
+
+ valve.markForCheckpoint(17L);
+ valve.shutValve(17L);
final OperatorEvent event1 = new TestOperatorEvent();
final OperatorEvent event2 = new TestOperatorEvent();
final CompletableFuture<Acknowledge> future1 = valve.sendEvent(new SerializedValue<>(event1), 3);
final CompletableFuture<Acknowledge> future2 = valve.sendEvent(new SerializedValue<>(event2), 0);
- valve.openValve();
+ valve.openValveAndUnmarkCheckpoint();
assertThat(sender.events, containsInAnyOrder(
new EventWithSubtask(event1, 3),
@@ -88,11 +109,13 @@ public class OperatorEventValveTest {
public void releasedEventsForwardSendFailures() throws Exception {
final TestEventSender sender = new TestEventSender(new FlinkException("test"));
final OperatorEventValve valve = new OperatorEventValve(sender);
- valve.shutValve();
+
+ valve.markForCheckpoint(17L);
+ valve.shutValve(17L);
final CompletableFuture<Acknowledge> future =
valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 10);
- valve.openValve();
+ valve.openValveAndUnmarkCheckpoint();
assertTrue(future.isCompletedExceptionally());
}
@@ -101,13 +124,14 @@ public class OperatorEventValveTest {
public void resetDropsAllEvents() throws Exception {
final TestEventSender sender = new TestEventSender();
final OperatorEventValve valve = new OperatorEventValve(sender);
- valve.shutValve();
+ valve.markForCheckpoint(17L);
+ valve.shutValve(17L);
valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 0);
valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 1);
valve.reset();
- valve.openValve();
+ valve.openValveAndUnmarkCheckpoint();
assertTrue(sender.events.isEmpty());
}
@@ -116,7 +140,8 @@ public class OperatorEventValveTest {
public void resetForTaskDropsSelectiveEvents() throws Exception {
final TestEventSender sender = new TestEventSender();
final OperatorEventValve valve = new OperatorEventValve(sender);
- valve.shutValve();
+ valve.markForCheckpoint(17L);
+ valve.shutValve(17L);
final OperatorEvent event1 = new TestOperatorEvent();
final OperatorEvent event2 = new TestOperatorEvent();
@@ -124,7 +149,7 @@ public class OperatorEventValveTest {
final CompletableFuture<Acknowledge> future2 = valve.sendEvent(new SerializedValue<>(event2), 1);
valve.resetForTask(1);
- valve.openValve();
+ valve.openValveAndUnmarkCheckpoint();
assertThat(sender.events, contains(new EventWithSubtask(event1, 0)));
assertTrue(future1.isDone());
@@ -136,7 +161,8 @@ public class OperatorEventValveTest {
final TestEventSender sender = new TestEventSender();
final OperatorEventValve valve = new OperatorEventValve(sender);
- valve.shutValve();
+ valve.markForCheckpoint(17L);
+ valve.shutValve(17L);
valve.reset();
assertFalse(valve.isShut());