You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/30 21:15:07 UTC
[flink] 04/11: [FLINK-16986][coordination] (part 1) Provide
exactly-once guarantees around checkpoints and operator event sending.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1a721d855998a7c91415312d4890d7d4f916e163
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon May 18 01:46:32 2020 +0200
[FLINK-16986][coordination] (part 1) Provide exactly-once guarantees around checkpoints and operator event sending.
This closes #12234
---
.../runtime/checkpoint/CheckpointCoordinator.java | 8 +-
.../OperatorCoordinatorCheckpointContext.java | 50 +-
.../checkpoint/OperatorCoordinatorCheckpoints.java | 12 +-
.../runtime/executiongraph/ExecutionGraph.java | 9 +-
.../coordination/OperatorCoordinatorHolder.java | 109 ++++-
.../operators/coordination/OperatorEventValve.java | 209 +++++++++
.../flink/runtime/scheduler/SchedulerBase.java | 4 +-
.../runtime/checkpoint/PendingCheckpointTest.java | 8 +-
.../CoordinatorEventsExactlyOnceITCase.java | 501 +++++++++++++++++++++
.../MockOperatorCoordinatorCheckpointContext.java} | 60 +--
.../OperatorCoordinatorSchedulerTest.java | 4 +-
.../coordination/OperatorEventValveTest.java | 144 ++++++
.../operators/coordination/TestEventSender.java | 113 +++++
.../operators/coordination/TestOperatorEvent.java | 33 ++
14 files changed, 1175 insertions(+), 89 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index b283a2a..51b98f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -545,13 +545,17 @@ public class CheckpointCoordinator {
if (throwable == null && checkpoint != null && !checkpoint.isDiscarded()) {
// no exception, no discarding, everything is OK
+ final long checkpointId = checkpoint.getCheckpointId();
snapshotTaskState(
timestamp,
- checkpoint.getCheckpointId(),
+ checkpointId,
checkpoint.getCheckpointStorageLocation(),
request.props,
executions,
request.advanceToEndOfTime);
+
+ coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
+
onTriggerSuccess();
} else {
// the initialization might not be finished yet
@@ -777,6 +781,8 @@ public class CheckpointCoordinator {
throwable = ExceptionUtils.stripCompletionException(throwable);
try {
+ coordinatorsToCheckpoint.forEach(OperatorCoordinatorCheckpointContext::abortCurrentTriggering);
+
if (checkpoint != null && !checkpoint.isDiscarded()) {
int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
LOG.warn(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
index 16acbb2..92fd2aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
@@ -24,51 +24,41 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import java.util.Collection;
import java.util.stream.Collectors;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* An {@link OperatorCoordinator} and its contextual information needed to trigger and
* acknowledge a checkpoint.
*/
-public final class OperatorCoordinatorCheckpointContext {
+public interface OperatorCoordinatorCheckpointContext {
- private final OperatorCoordinator coordinator;
+ // ------------------------------------------------------------------------
+ // properties
+ // ------------------------------------------------------------------------
- private final OperatorID operatorId;
+ OperatorCoordinator coordinator();
- private final int maxParallelism;
+ OperatorID operatorId();
- private final int currentParallelism;
+ int maxParallelism();
- public OperatorCoordinatorCheckpointContext(
- OperatorCoordinator coordinator,
- OperatorID operatorId,
- int maxParallelism,
- int currentParallelism) {
+ int currentParallelism();
- this.coordinator = checkNotNull(coordinator);
- this.operatorId = checkNotNull(operatorId);
- this.maxParallelism = maxParallelism;
- this.currentParallelism = currentParallelism;
- }
+ // ------------------------------------------------------------------------
+ // checkpoint triggering callbacks
+ // ------------------------------------------------------------------------
- public OperatorCoordinator coordinator() {
- return coordinator;
- }
+ void onCallTriggerCheckpoint(long checkpointId);
- public OperatorID operatorId() {
- return operatorId;
- }
+ void onCheckpointStateFutureComplete(long checkpointId);
- public int maxParallelism() {
- return maxParallelism;
- }
+ void afterSourceBarrierInjection(long checkpointId);
- public int currentParallelism() {
- return currentParallelism;
- }
+ void abortCurrentTriggering();
+
+ // ------------------------------------------------------------------------
+ // utils
+ // ------------------------------------------------------------------------
- public static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
+ static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
return infos.stream()
.map(OperatorCoordinatorCheckpointContext::operatorId)
.collect(Collectors.toList());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
index 39ac10f..68fc8f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
@@ -58,7 +58,17 @@ final class OperatorCoordinatorCheckpoints {
final Collection<CompletableFuture<CoordinatorSnapshot>> individualSnapshots = new ArrayList<>(coordinators.size());
for (final OperatorCoordinatorCheckpointContext coordinator : coordinators) {
- individualSnapshots.add(triggerCoordinatorCheckpoint(coordinator, checkpointId));
+ final CompletableFuture<CoordinatorSnapshot> checkpointFuture = triggerCoordinatorCheckpoint(coordinator, checkpointId);
+ coordinator.onCallTriggerCheckpoint(checkpointId);
+
+ individualSnapshots.add(checkpointFuture);
+ checkpointFuture.whenComplete((ignored, failure) -> {
+ if (failure != null) {
+ coordinator.abortCurrentTriggering();
+ } else {
+ coordinator.onCheckpointStateFutureComplete(checkpointId);
+ }
+ });
}
return FutureUtils.combineAll(individualSnapshots).thenApply(AllCoordinatorSnapshots::new);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index b84a86f..dfd20ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -64,7 +64,6 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.scheduler.InternalFailuresListener;
import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology;
@@ -569,13 +568,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
private Collection<OperatorCoordinatorCheckpointContext> buildOpCoordinatorCheckpointContexts() {
final ArrayList<OperatorCoordinatorCheckpointContext> contexts = new ArrayList<>();
for (final ExecutionJobVertex vertex : verticesInCreationOrder) {
- for (final OperatorCoordinatorHolder coordinator : vertex.getOperatorCoordinators()) {
- contexts.add(new OperatorCoordinatorCheckpointContext(
- coordinator,
- coordinator.getOperatorId(),
- vertex.getMaxParallelism(),
- vertex.getParallelism()));
- }
+ contexts.addAll(vertex.getOperatorCoordinators());
}
contexts.trimToSize();
return contexts;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 43c47fe..c70e229 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.operators.coordination;
+import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -33,6 +34,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -51,34 +53,99 @@ import static org.apache.flink.util.Preconditions.checkState;
* However, the real Coordinators can only be created after SchedulerNG was created, because they need
* a reference to it for the failure calls.
*/
-public class OperatorCoordinatorHolder implements OperatorCoordinator {
+public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorCoordinatorCheckpointContext {
+
+ private static final long NO_CHECKPOINT = Long.MIN_VALUE;
private final OperatorCoordinator coordinator;
private final OperatorID operatorId;
private final LazyInitializedCoordinatorContext context;
+ private final OperatorEventValve eventValve;
+
+ // these two fields are needed for the construction of OperatorStateHandles when taking checkpoints
+ private final int operatorParallelism;
+ private final int operatorMaxParallelism;
+
+ private long currentlyTriggeredCheckpoint;
+
private OperatorCoordinatorHolder(
final OperatorID operatorId,
final OperatorCoordinator coordinator,
- final LazyInitializedCoordinatorContext context) {
+ final LazyInitializedCoordinatorContext context,
+ final OperatorEventValve eventValve,
+ final int operatorParallelism,
+ final int operatorMaxParallelism) {
this.operatorId = checkNotNull(operatorId);
this.coordinator = checkNotNull(coordinator);
this.context = checkNotNull(context);
+ this.eventValve = checkNotNull(eventValve);
+ this.operatorParallelism = operatorParallelism;
+ this.operatorMaxParallelism = operatorMaxParallelism;
+
+ this.currentlyTriggeredCheckpoint = NO_CHECKPOINT;
}
+ public void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
+ context.lazyInitialize(scheduler, schedulerExecutor);
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties
// ------------------------------------------------------------------------
- public OperatorID getOperatorId() {
+ @Override
+ public OperatorID operatorId() {
return operatorId;
}
- public OperatorCoordinator getCoordinator() {
+ @Override
+ public OperatorCoordinator coordinator() {
return coordinator;
}
- public void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
- context.lazyInitialize(scheduler, schedulerExecutor);
+ @Override
+ public int maxParallelism() {
+ return operatorMaxParallelism;
+ }
+
+ @Override
+ public int currentParallelism() {
+ return operatorParallelism;
+ }
+
+ // ------------------------------------------------------------------------
+ // Checkpointing Callbacks
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void onCallTriggerCheckpoint(long checkpointId) {
+ checkCheckpointAlreadyHappening(checkpointId);
+ currentlyTriggeredCheckpoint = checkpointId;
+ }
+
+ @Override
+ public void onCheckpointStateFutureComplete(long checkpointId) {
+ checkCheckpointAlreadyHappening(checkpointId);
+ eventValve.shutValve();
+ }
+
+ @Override
+ public void afterSourceBarrierInjection(long checkpointId) {
+ checkCheckpointAlreadyHappening(checkpointId);
+ eventValve.openValve();
+ currentlyTriggeredCheckpoint = NO_CHECKPOINT;
+ }
+
+ @Override
+ public void abortCurrentTriggering() {
+ eventValve.openValve();
+ currentlyTriggeredCheckpoint = NO_CHECKPOINT;
+ }
+
+ private void checkCheckpointAlreadyHappening(long checkpointId) {
+ checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || currentlyTriggeredCheckpoint == checkpointId);
}
// ------------------------------------------------------------------------
@@ -105,6 +172,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator {
@Override
public void subtaskFailed(int subtask, @Nullable Throwable reason) {
coordinator.subtaskFailed(subtask, reason);
+ eventValve.resetForTask(subtask);
}
@Override
@@ -134,9 +202,24 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator {
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
final OperatorCoordinator.Provider provider = serializedProvider.deserializeValue(classLoader);
final OperatorID opId = provider.getOperatorId();
- final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(opId, jobVertex);
+
+ final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender =
+ (serializedEvent, subtask) -> {
+ final Execution executionAttempt = jobVertex.getTaskVertices()[subtask].getCurrentExecutionAttempt();
+ return executionAttempt.sendOperatorEvent(opId, serializedEvent);
+ };
+
+ final OperatorEventValve valve = new OperatorEventValve(eventSender);
+ final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(opId, jobVertex, valve);
final OperatorCoordinator coordinator = provider.create(context);
- return new OperatorCoordinatorHolder(opId, coordinator, context);
+
+ return new OperatorCoordinatorHolder(
+ opId,
+ coordinator,
+ context,
+ valve,
+ jobVertex.getParallelism(),
+ jobVertex.getMaxParallelism());
}
}
@@ -157,13 +240,18 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator {
private final OperatorID operatorId;
private final ExecutionJobVertex jobVertex;
+ private final OperatorEventValve eventValve;
private SchedulerNG scheduler;
private Executor schedulerExecutor;
- public LazyInitializedCoordinatorContext(OperatorID operatorId, ExecutionJobVertex jobVertex) {
+ public LazyInitializedCoordinatorContext(
+ OperatorID operatorId,
+ ExecutionJobVertex jobVertex,
+ OperatorEventValve eventValve) {
this.operatorId = checkNotNull(operatorId);
this.jobVertex = checkNotNull(jobVertex);
+ this.eventValve = checkNotNull(eventValve);
}
void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
@@ -208,8 +296,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator {
throw new FlinkRuntimeException("Cannot serialize operator event", e);
}
- final Execution executionAttempt = jobVertex.getTaskVertices()[targetSubtask].getCurrentExecutionAttempt();
- return executionAttempt.sendOperatorEvent(operatorId, serializedEvent);
+ return eventValve.sendEvent(serializedEvent, targetSubtask);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
new file mode 100644
index 0000000..bc1bfcf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+
+/**
+ * The event value is the connection through which operator events are sent, from coordinator to
+ * operator.It can temporarily block events from going through, buffering them, and releasing them
+ * later.
+ *
+ * <p>The valve can also drop buffered events for all or selected targets.
+ *
+ * <p>This class is fully thread safe, under the assumption that the event sender is thread-safe.
+ */
+public final class OperatorEventValve {
+
+ private final Object lock = new Object();
+
+ @GuardedBy("lock")
+ private final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender;
+
+ @GuardedBy("lock")
+ private final Map<Integer, List<BlockedEvent>> blockedEvents = new LinkedHashMap<>();
+
+ @GuardedBy("lock")
+ private boolean shut;
+
+ /**
+ * Constructs a new OperatorEventValve, passing the events to the given function when the valve is open
+ * or opened again. The second parameter of the BiFunction is the target operator subtask index.
+ */
+ public OperatorEventValve(BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender) {
+ this.eventSender = eventSender;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public boolean isShut() {
+ // synchronized block for visibility
+ synchronized (lock) {
+ return shut;
+ }
+ }
+
+ /**
+ * Send the event directly, if the valve is open, and returns the original sending result future.
+ *
+ * <p>If the valve is closed this buffers the event and returns an incomplete future. The future is completed
+ * with the original result once the valve is opened. If the event is never sent (because it gets dropped
+ * through a call to {@link #reset()} or {@link #resetForTask(int)}, then the returned future till be
+ * completed exceptionally.
+ */
+ public CompletableFuture<Acknowledge> sendEvent(SerializedValue<OperatorEvent> event, int subtask) {
+ synchronized (lock) {
+ if (!shut) {
+ return eventSender.apply(event, subtask);
+ }
+
+ final List<BlockedEvent> eventsForTask = blockedEvents.computeIfAbsent(subtask, (key) -> new ArrayList<>());
+ final CompletableFuture<Acknowledge> future = new CompletableFuture<>();
+ eventsForTask.add(new BlockedEvent(event, subtask, future));
+ return future;
+ }
+ }
+
+ /**
+ * Shuts the value. All events sent through this valve are blocked until the valve is re-opened.
+ * If the valve is already shut, this does nothing.
+ */
+ public void shutValve() {
+ // synchronized block for visibility
+ synchronized (lock) {
+ shut = true;
+ }
+ }
+
+ /**
+ * Opens the value, releasing all buffered events.
+ */
+ public void openValve() {
+ final ArrayList<FuturePair> futures;
+
+ // send all events under lock, so that no new event can sneak between
+ synchronized (lock) {
+ if (!shut) {
+ return;
+ }
+
+ futures = new ArrayList<>(blockedEvents.size());
+
+ for (List<BlockedEvent> eventsForTask : blockedEvents.values()) {
+ for (BlockedEvent blockedEvent : eventsForTask) {
+ final CompletableFuture<Acknowledge> ackFuture = eventSender.apply(blockedEvent.event, blockedEvent.subtask);
+ futures.add(new FuturePair(blockedEvent.future, ackFuture));
+ }
+ }
+ blockedEvents.clear();
+ shut = false;
+ }
+
+ // apply the logic on the future outside the lock, to be safe
+ for (FuturePair pair : futures) {
+ final CompletableFuture<Acknowledge> originalFuture = pair.originalFuture;
+ pair.ackFuture.whenComplete((success, failure) -> {
+ if (failure != null) {
+ originalFuture.completeExceptionally(failure);
+ } else {
+ originalFuture.complete(success);
+ }
+ });
+ }
+ }
+
+ /**
+ * Drops all blocked events for a specific subtask.
+ */
+ public void resetForTask(int subtask) {
+ final List<BlockedEvent> events;
+ synchronized (lock) {
+ events = blockedEvents.remove(subtask);
+ }
+
+ failAllFutures(events);
+ }
+
+ /**
+ * Resets the valve, dropping all blocked events and opening the valve.
+ */
+ public void reset() {
+ final List<BlockedEvent> events = new ArrayList<>();
+ synchronized (lock) {
+ for (List<BlockedEvent> taskEvents : blockedEvents.values()) {
+ if (taskEvents != null) {
+ events.addAll(taskEvents);
+ }
+ }
+ blockedEvents.clear();
+ shut = false;
+ }
+
+ failAllFutures(events);
+ }
+
+ private static void failAllFutures(@Nullable List<BlockedEvent> events) {
+ if (events == null || events.isEmpty()) {
+ return;
+ }
+
+ final Exception failureCause = new FlinkException("Event discarded due to failure of target task");
+ for (BlockedEvent evt : events) {
+ evt.future.completeExceptionally(failureCause);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class BlockedEvent {
+
+ final SerializedValue<OperatorEvent> event;
+ final CompletableFuture<Acknowledge> future;
+ final int subtask;
+
+ BlockedEvent(SerializedValue<OperatorEvent> event, int subtask, CompletableFuture<Acknowledge> future) {
+ this.event = event;
+ this.future = future;
+ this.subtask = subtask;
+ }
+ }
+
+ private static final class FuturePair {
+
+ final CompletableFuture<Acknowledge> originalFuture;
+ final CompletableFuture<Acknowledge> ackFuture;
+
+ FuturePair(CompletableFuture<Acknowledge> originalFuture, CompletableFuture<Acknowledge> ackFuture) {
+ this.originalFuture = originalFuture;
+ this.ackFuture = ackFuture;
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 6f2ae47..ce1d8a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -962,7 +962,7 @@ public abstract class SchedulerBase implements SchedulerNG {
throw new FlinkException("Coordinator of operator " + operator + " does not exist");
}
- final OperatorCoordinator coordinator = coordinatorHolder.getCoordinator();
+ final OperatorCoordinator coordinator = coordinatorHolder.coordinator();
if (coordinator instanceof CoordinationRequestHandler) {
return ((CoordinationRequestHandler) coordinator).handleCoordinationRequest(request);
} else {
@@ -1002,7 +1002,7 @@ public abstract class SchedulerBase implements SchedulerNG {
Map<OperatorID, OperatorCoordinatorHolder> coordinatorMap = new HashMap<>();
for (ExecutionJobVertex vertex : executionGraph.getAllVertices().values()) {
for (OperatorCoordinatorHolder holder : vertex.getOperatorCoordinators()) {
- coordinatorMap.put(holder.getOperatorId(), holder);
+ coordinatorMap.put(holder.operatorId(), holder);
}
}
return coordinatorMap;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index ee292b0..b454633 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorCheckpointContext;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.TestingStreamStateHandle;
@@ -563,11 +563,7 @@ public class PendingCheckpointTest {
}
private static OperatorCoordinatorCheckpointContext createOperatorCoordinator() {
- return new OperatorCoordinatorCheckpointContext(
- new MockOperatorCoordinator(),
- new OperatorID(),
- 256,
- 50);
+ return new MockOperatorCoordinatorCheckpointContext();
}
@SuppressWarnings("unchecked")
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
new file mode 100644
index 0000000..919e6fe
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration Test case that validates the exactly-once mechanism for coordinator events
+ * around checkpoints.
+ *
+ * <p>The test provokes the corner cases of the mechanism described in {@link OperatorCoordinatorHolder}.
+ * <pre>
+ * Coordinator one events: => a . . b . |trigger| . . |complete| . . c . . d . |barrier| . e . f
+ * Coordinator two events: => . . x . . |trigger| . . . . . . . . . .|complete||barrier| . . y . . z
+ * </pre>
+ *
+ * <p>The test generates two sequences of events form two Operator Coordinators to two operators (tasks).
+ * The event sequences have a different speed in which they are sent. The coordinators have different
+ * delays in which they complete their checkpoints. Both coordinators inject failures at different points.
+ */
+@SuppressWarnings("serial")
+public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
+
+ private static final ConfigOption<String> ACC_NAME = ConfigOptions.key("acc").stringType().noDefaultValue();
+ private static final String OPERATOR_1_ACCUMULATOR = "op-acc-1";
+ private static final String OPERATOR_2_ACCUMULATOR = "op-acc-2";
+
+ private static MiniCluster miniCluster;
+
+ @BeforeClass
+ public static void startMiniCluster() throws Exception {
+ final Configuration config = new Configuration();
+ config.setString(RestOptions.BIND_PORT, "0");
+
+ final MiniClusterConfiguration clusterCfg = new MiniClusterConfiguration.Builder()
+ .setNumTaskManagers(2)
+ .setNumSlotsPerTaskManager(1)
+ .setConfiguration(config)
+ .build();
+
+ miniCluster = new MiniCluster(clusterCfg);
+ miniCluster.start();
+ }
+
+ @AfterClass
+ public static void shutdownMiniCluster() throws Exception {
+ miniCluster.close();
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void test() throws Exception {
+ final int numEvents1 = 200;
+ final int numEvents2 = 5;
+ final int delay1 = 1;
+ final int delay2 = 200;
+
+ final JobVertex task1 = buildJobVertex("TASK_1", numEvents1, delay1, OPERATOR_1_ACCUMULATOR);
+ final JobVertex task2 = buildJobVertex("TASK_2", numEvents2, delay2, OPERATOR_2_ACCUMULATOR);
+ final JobGraph jobGraph = new JobGraph("Coordinator Events Job", task1, task2);
+ jobGraph.setSnapshotSettings(createCheckpointSettings(task1, task2));
+
+ final JobExecutionResult result = miniCluster.executeJobBlocking(jobGraph);
+
+ checkListContainsSequence(result.getAccumulatorResult(OPERATOR_2_ACCUMULATOR), numEvents2);
+ checkListContainsSequence(result.getAccumulatorResult(OPERATOR_1_ACCUMULATOR), numEvents1);
+ }
+
+ private static void checkListContainsSequence(List<Integer> ints, int length) {
+ if (ints.size() != length) {
+ failList(ints, length);
+ }
+
+ int nextExpected = 0;
+ for (int next : ints) {
+ if (next != nextExpected++) {
+ failList(ints, length);
+ }
+ }
+ }
+
+ private static void failList(List<Integer> ints, int length) {
+ fail("List did not contain expected sequence of " + length + " elements, but was: " + ints);
+ }
+
+ // ------------------------------------------------------------------------
+ // test setup helpers
+ // ------------------------------------------------------------------------
+
+ private static JobVertex buildJobVertex(String name, int numEvents, int delay, String accName) throws IOException {
+ final JobVertex vertex = new JobVertex(name);
+ final OperatorID opId = OperatorID.fromJobVertexID(vertex.getID());
+
+ vertex.setParallelism(1);
+ vertex.setInvokableClass(EventCollectingTask.class);
+ vertex.getConfiguration().setString(ACC_NAME, accName);
+
+ final OperatorCoordinator.Provider provider = new OperatorCoordinator.Provider() {
+
+ @Override
+ public OperatorID getOperatorId() {
+ return opId;
+ }
+
+ @Override
+ public OperatorCoordinator create(OperatorCoordinator.Context context) {
+ return new EventSendingCoordinator(context, numEvents, delay);
+ }
+ };
+
+ vertex.addOperatorCoordinator(new SerializedValue<>(provider));
+
+ return vertex;
+ }
+
+ private static JobCheckpointingSettings createCheckpointSettings(JobVertex... vertices) {
+ final List<JobVertexID> ids = Arrays.stream(vertices)
+ .map(JobVertex::getID)
+ .collect(Collectors.toList());
+
+ final CheckpointCoordinatorConfiguration coordCfg =
+ new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder()
+ .setMaxConcurrentCheckpoints(1)
+ .setCheckpointInterval(10)
+ .setCheckpointTimeout(100_000)
+ .build();
+
+ return new JobCheckpointingSettings(ids, ids, ids, coordCfg, null);
+ }
+
+ // ------------------------------------------------------------------------
+ // test operator and coordinator implementations
+ // ------------------------------------------------------------------------
+
+ private static final class StartEvent implements OperatorEvent {}
+
+ private static final class EndEvent implements OperatorEvent {}
+
+ private static final class IntegerEvent implements OperatorEvent {
+
+ final int value;
+
+ IntegerEvent(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return "IntegerEvent " + value;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class EventSendingCoordinator implements OperatorCoordinator, Runnable {
+
+ private final Context context;
+
+ private ScheduledExecutorService executor;
+ private volatile ScheduledFuture<?> periodicTask;
+
+ private final int delay;
+ private final int maxNumber;
+ private int nextNumber;
+
+ private volatile CompletableFuture<byte[]> requestedCheckpoint;
+ private CompletableFuture<byte[]> nextToComplete;
+
+ private final int failAtMessage;
+ private boolean failedBefore;
+
+ private EventSendingCoordinator(Context context, int numEvents, int delay) {
+ checkArgument(delay > 0);
+ checkArgument(numEvents >= 3);
+
+ this.context = context;
+ this.maxNumber = numEvents;
+ this.delay = delay;
+ this.executor = Executors.newSingleThreadScheduledExecutor();
+
+ this.failAtMessage = numEvents / 3 + new Random().nextInt(numEvents / 3);
+ }
+
+ @Override
+ public void start() throws Exception {}
+
+ @Override
+ public void close() throws Exception {
+ executor.shutdownNow();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
+ if (subtask != 0 || !(event instanceof StartEvent)) {
+ throw new Exception(String.format("Don't recognize event '%s' from task %d.", event, subtask));
+ }
+
+ if (periodicTask != null) {
+ throw new Exception("periodic already running");
+ }
+ periodicTask = executor.scheduleWithFixedDelay(this, delay, delay, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+ periodicTask.cancel(false);
+ periodicTask = null;
+ executor.execute(() -> nextNumber = 0);
+ }
+
+ @Override
+ public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+ executor.execute(() -> nextNumber = bytesToInt(checkpointData));
+ }
+
+ @Override
+ public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+ final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+ requestedCheckpoint = checkpointFuture;
+ return checkpointFuture;
+ }
+
+ @Override
+ public void checkpointComplete(long checkpointId) {}
+
+ @SuppressWarnings("CallToPrintStackTrace")
+ @Override
+ public void run() {
+ try {
+ handleCheckpoint();
+ sendNextEvent();
+ checkWhetherToTriggerFailure();
+ } catch (Throwable t) {
+ // this is so that exceptions thrown in the scheduled executor don't just freeze the test
+ t.printStackTrace();
+ System.exit(-1);
+ }
+ }
+
+ private void handleCheckpoint() {
+ // we move the checkpoint one further so it completed after the next delay
+ if (nextToComplete != null) {
+ final int numToCheckpoint = Math.min(nextNumber, maxNumber);
+ nextToComplete.complete(intToBytes(numToCheckpoint));
+ nextToComplete = null;
+ }
+ if (requestedCheckpoint != null) {
+ nextToComplete = requestedCheckpoint;
+ requestedCheckpoint = null;
+ }
+ }
+
+ private void sendNextEvent() {
+ if (nextNumber > maxNumber) {
+ return;
+ }
+ try {
+ if (nextNumber == maxNumber) {
+ context.sendEvent(new EndEvent(), 0);
+ } else {
+ context.sendEvent(new IntegerEvent(nextNumber), 0);
+ }
+ nextNumber++;
+ } catch (TaskNotRunningException ignored) {}
+ }
+
+ private void checkWhetherToTriggerFailure() {
+ if (nextNumber >= failAtMessage && !failedBefore) {
+ failedBefore = true;
+ context.failJob(new Exception("test failure"));
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * The runtime task that receives the events and accumulates the numbers. The task is stateful
+ * and checkpoints the accumulator.
+ */
+ public static final class EventCollectingTask extends AbstractInvokable {
+
+ private final OperatorID operatorID;
+ private final String accumulatorName;
+ private final LinkedBlockingQueue<Object> actions;
+
+ private volatile boolean running = true;
+
+ public EventCollectingTask(Environment environment) {
+ super(environment);
+ this.operatorID = OperatorID.fromJobVertexID(environment.getJobVertexId());
+ this.accumulatorName = environment.getTaskConfiguration().get(ACC_NAME);
+ this.actions = new LinkedBlockingQueue<>();
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ final ArrayList<Integer> collectedInts = new ArrayList<>();
+ restoreState(collectedInts);
+
+ // signal the coordinator to start
+ getEnvironment().getOperatorCoordinatorEventGateway()
+ .sendOperatorEventToCoordinator(operatorID, new SerializedValue<>(new StartEvent()));
+
+ // poor-man's mailbox
+ Object next;
+ while (running && !((next = actions.take()) instanceof EndEvent)) {
+ if (next instanceof IntegerEvent) {
+ collectedInts.add(((IntegerEvent) next).value);
+ } else if (next instanceof CheckpointMetaData) {
+ takeCheckpoint(((CheckpointMetaData) next).getCheckpointId(), collectedInts);
+ } else {
+ throw new Exception("Unrecognized: " + next);
+ }
+ }
+
+ if (running) {
+ final ListAccumulator<Integer> acc = new ListAccumulator<>();
+ collectedInts.forEach(acc::add);
+ getEnvironment().getAccumulatorRegistry().getUserMap().put(accumulatorName, acc);
+ }
+ }
+
+ @Override
+ public void cancel() throws Exception {
+ running = false;
+ }
+
+ @Override
+ public Future<Boolean> triggerCheckpointAsync(
+ CheckpointMetaData checkpointMetaData,
+ CheckpointOptions checkpointOptions,
+ boolean advanceToEndOfEventTime) {
+ actions.add(checkpointMetaData); // this signals the main thread should do a checkpoint
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException {
+ try {
+ final OperatorEvent opEvent = event.deserializeValue(getUserCodeClassLoader());
+ actions.add(opEvent);
+ } catch (IOException | ClassNotFoundException e) {
+ throw new FlinkException(e);
+ }
+ }
+
+ private void takeCheckpoint(long checkpointId, List<Integer> state) throws Exception {
+ final StreamStateHandle handle = stateToHandle(state);
+ final TaskStateSnapshot snapshot = createSnapshot(handle, operatorID);
+ getEnvironment().acknowledgeCheckpoint(checkpointId, new CheckpointMetrics(), snapshot);
+ }
+
+ private void restoreState(List<Integer> target) throws Exception {
+ final StreamStateHandle stateHandle = readSnapshot(getEnvironment().getTaskStateManager(), operatorID);
+ if (stateHandle != null) {
+ final List<Integer> list = handleToState(stateHandle);
+ target.addAll(list);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // serialization shenannigans
+ // ------------------------------------------------------------------------
+
+ static byte[] intToBytes(int value) {
+ final byte[] bytes = new byte[4];
+ ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).putInt(0, value);
+ return bytes;
+ }
+
+ static int bytesToInt(byte[] bytes) {
+ assertEquals(4, bytes.length);
+ return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getInt(0);
+ }
+
+ static ByteStreamStateHandle stateToHandle(List<Integer> state) throws IOException {
+ final byte[] bytes = InstantiationUtil.serializeObject(state);
+ return new ByteStreamStateHandle("state", bytes);
+ }
+
+ static List<Integer> handleToState(StreamStateHandle handle) throws IOException, ClassNotFoundException {
+ final ByteStreamStateHandle byteHandle = (ByteStreamStateHandle) handle;
+ return InstantiationUtil.deserializeObject(byteHandle.getData(), EventCollectingTask.class.getClassLoader());
+ }
+
+ static TaskStateSnapshot createSnapshot(StreamStateHandle handle, OperatorID operatorId) {
+ final OperatorStateHandle.StateMetaInfo metaInfo = new OperatorStateHandle.StateMetaInfo(
+ new long[]{0}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
+
+ final OperatorStateHandle state = new OperatorStreamStateHandle(
+ Collections.singletonMap("état_et_moi_:_ça_fait_deux", metaInfo), handle);
+
+ final OperatorSubtaskState oss = new OperatorSubtaskState(
+ StateObjectCollection.singleton(state), StateObjectCollection.empty(),
+ StateObjectCollection.empty(), StateObjectCollection.empty());
+ return new TaskStateSnapshot(Collections.singletonMap(operatorId, oss));
+ }
+
+ @Nullable
+ static StreamStateHandle readSnapshot(TaskStateManager stateManager, OperatorID operatorId) {
+ final PrioritizedOperatorSubtaskState poss = stateManager.prioritizedOperatorState(operatorId);
+ if (!poss.isRestored()) {
+ return null;
+ }
+
+ final StateObjectCollection<OperatorStateHandle> opState =
+ stateManager.prioritizedOperatorState(operatorId).getPrioritizedManagedOperatorState().get(0);
+ final OperatorStateHandle handle = Iterators.getOnlyElement(opState.iterator());
+ return handle.getDelegateStateHandle();
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
similarity index 56%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
copy to flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
index 16acbb2..7cab82f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
@@ -16,61 +16,65 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.checkpoint;
+package org.apache.flink.runtime.operators.coordination;
+import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-
-import java.util.Collection;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * An {@link OperatorCoordinator} and its contextual information needed to trigger and
- * acknowledge a checkpoint.
+ * A testing mock implementation of the {@link OperatorCoordinatorCheckpointContext}.
*/
-public final class OperatorCoordinatorCheckpointContext {
-
- private final OperatorCoordinator coordinator;
+public class MockOperatorCoordinatorCheckpointContext implements OperatorCoordinatorCheckpointContext {
private final OperatorID operatorId;
-
+ private final OperatorCoordinator coordinator;
+ private final int parallelism;
private final int maxParallelism;
- private final int currentParallelism;
+ public MockOperatorCoordinatorCheckpointContext() {
+ this(new OperatorID(), new MockOperatorCoordinator(), 50, 256);
+ }
- public OperatorCoordinatorCheckpointContext(
- OperatorCoordinator coordinator,
+ public MockOperatorCoordinatorCheckpointContext(
OperatorID operatorId,
- int maxParallelism,
- int currentParallelism) {
-
- this.coordinator = checkNotNull(coordinator);
- this.operatorId = checkNotNull(operatorId);
+ OperatorCoordinator coordinator,
+ int parallelism,
+ int maxParallelism) {
+ this.operatorId = operatorId;
+ this.coordinator = coordinator;
+ this.parallelism = parallelism;
this.maxParallelism = maxParallelism;
- this.currentParallelism = currentParallelism;
}
+ @Override
public OperatorCoordinator coordinator() {
return coordinator;
}
+ @Override
public OperatorID operatorId() {
return operatorId;
}
+ @Override
public int maxParallelism() {
return maxParallelism;
}
+ @Override
public int currentParallelism() {
- return currentParallelism;
+ return parallelism;
}
- public static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
- return infos.stream()
- .map(OperatorCoordinatorCheckpointContext::operatorId)
- .collect(Collectors.toList());
- }
+ @Override
+ public void onCallTriggerCheckpoint(long checkpointId) {}
+
+ @Override
+ public void onCheckpointStateFutureComplete(long checkpointId) {}
+
+ @Override
+ public void afterSourceBarrierInjection(long checkpointId) {}
+
+ @Override
+ public void abortCurrentTriggering() {}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 14a136d..8e5404a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -505,11 +505,11 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
final Optional<OperatorCoordinatorHolder> coordinatorOptional = vertexWithCoordinator
.getOperatorCoordinators()
.stream()
- .filter((holder) -> holder.getOperatorId().equals(testOperatorId))
+ .filter((holder) -> holder.operatorId().equals(testOperatorId))
.findFirst();
assertTrue("vertex does not contain coordinator", coordinatorOptional.isPresent());
- final OperatorCoordinator coordinator = coordinatorOptional.get().getCoordinator();
+ final OperatorCoordinator coordinator = coordinatorOptional.get().coordinator();
assertThat(coordinator, instanceOf(TestingOperatorCoordinator.class));
return (TestingOperatorCoordinator) coordinator;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
new file mode 100644
index 0000000..9b0325d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.TestEventSender.EventWithSubtask;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the {@link OperatorEventValve}.
+ */
+public class OperatorEventValveTest {
+
+ @Test
+ public void eventsPassThroughOpenValve() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorEventValve valve = new OperatorEventValve(sender);
+
+ final OperatorEvent event = new TestOperatorEvent();
+ final CompletableFuture<Acknowledge> future = valve.sendEvent(new SerializedValue<>(event), 11);
+
+ assertThat(sender.events, contains(new EventWithSubtask(event, 11)));
+ assertTrue(future.isDone());
+ }
+
+ @Test
+ public void eventsBlockedByClosedValve() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorEventValve valve = new OperatorEventValve(sender);
+ valve.shutValve();
+
+ final CompletableFuture<Acknowledge> future =
+ valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 1);
+
+ assertTrue(sender.events.isEmpty());
+ assertFalse(future.isDone());
+ }
+
+ @Test
+ public void eventsReleasedAfterOpeningValve() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorEventValve valve = new OperatorEventValve(sender);
+ valve.shutValve();
+
+ final OperatorEvent event1 = new TestOperatorEvent();
+ final OperatorEvent event2 = new TestOperatorEvent();
+ final CompletableFuture<Acknowledge> future1 = valve.sendEvent(new SerializedValue<>(event1), 3);
+ final CompletableFuture<Acknowledge> future2 = valve.sendEvent(new SerializedValue<>(event2), 0);
+
+ valve.openValve();
+
+ assertThat(sender.events, containsInAnyOrder(
+ new EventWithSubtask(event1, 3),
+ new EventWithSubtask(event2, 0)
+ ));
+ assertTrue(future1.isDone());
+ assertTrue(future2.isDone());
+ }
+
+ @Test
+ public void releasedEventsForwardSendFailures() throws Exception {
+ final TestEventSender sender = new TestEventSender(new FlinkException("test"));
+ final OperatorEventValve valve = new OperatorEventValve(sender);
+ valve.shutValve();
+
+ final CompletableFuture<Acknowledge> future =
+ valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 10);
+ valve.openValve();
+
+ assertTrue(future.isCompletedExceptionally());
+ }
+
+ @Test
+ public void resetDropsAllEvents() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorEventValve valve = new OperatorEventValve(sender);
+ valve.shutValve();
+
+ valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 0);
+ valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 1);
+
+ valve.reset();
+ valve.openValve();
+
+ assertTrue(sender.events.isEmpty());
+ }
+
+ @Test
+ public void resetForTaskDropsSelectiveEvents() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorEventValve valve = new OperatorEventValve(sender);
+ valve.shutValve();
+
+ final OperatorEvent event1 = new TestOperatorEvent();
+ final OperatorEvent event2 = new TestOperatorEvent();
+ final CompletableFuture<Acknowledge> future1 = valve.sendEvent(new SerializedValue<>(event1), 0);
+ final CompletableFuture<Acknowledge> future2 = valve.sendEvent(new SerializedValue<>(event2), 1);
+
+ valve.resetForTask(1);
+ valve.openValve();
+
+ assertThat(sender.events, contains(new EventWithSubtask(event1, 0)));
+ assertTrue(future1.isDone());
+ assertTrue(future2.isCompletedExceptionally());
+ }
+
+ @Test
+ public void resetOpensValve() throws Exception {
+ final TestEventSender sender = new TestEventSender();
+ final OperatorEventValve valve = new OperatorEventValve(sender);
+
+ valve.shutValve();
+ valve.reset();
+
+ assertFalse(valve.isShut());
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestEventSender.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestEventSender.java
new file mode 100644
index 0000000..8b93dea
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestEventSender.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+
+/**
+ * A test implementation of the BiFunction interface used as the underlying event sender in the
+ * {@link OperatorCoordinatorHolder}.
+ */
+final class TestEventSender
+ implements BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> {
+
+ final ArrayList<EventWithSubtask> events = new ArrayList<>();
+
+ @Nullable
+ private final Throwable failureCause;
+
+ /**
+ * Creates a sender that collects events and acknowledges all events successfully.
+ */
+ TestEventSender() {
+ this(null);
+ }
+
+ /**
+ * Creates a sender that collects events and fails all the send-futures with the given exception,
+ * if it is non-null.
+ */
+ TestEventSender(@Nullable Throwable failureCause) {
+ this.failureCause = failureCause;
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> apply(SerializedValue<OperatorEvent> event, Integer subtask) {
+ final OperatorEvent deserializedEvent;
+ try {
+ deserializedEvent = event.deserializeValue(getClass().getClassLoader());
+ } catch (IOException | ClassNotFoundException e) {
+ throw new AssertionError(e);
+ }
+ events.add(new EventWithSubtask(deserializedEvent, subtask));
+
+ return failureCause == null
+ ? CompletableFuture.completedFuture(Acknowledge.get())
+ : FutureUtils.completedExceptionally(failureCause);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A combination of an {@link OperatorEvent} and the target subtask it is sent to.
+ */
+ static final class EventWithSubtask {
+
+ final OperatorEvent event;
+ final int subtask;
+
+ EventWithSubtask(OperatorEvent event, int subtask) {
+ this.event = event;
+ this.subtask = subtask;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final EventWithSubtask that = (EventWithSubtask) o;
+ return subtask == that.subtask &&
+ event.equals(that.event);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(event, subtask);
+ }
+
+ @Override
+ public String toString() {
+ return event + " => subtask " + subtask;
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java
index 6eb600b..8622143 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java
@@ -23,4 +23,37 @@ package org.apache.flink.runtime.operators.coordination;
*/
public final class TestOperatorEvent implements OperatorEvent {
private static final long serialVersionUID = 1L;
+
+ private final int value;
+
+ public TestOperatorEvent() {
+ // pick some random and rather unique value
+ this.value = System.identityHashCode(this);
+ }
+
+ public TestOperatorEvent(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ @Override
+ public int hashCode() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this || (
+ obj != null &&
+ obj.getClass() == TestOperatorEvent.class &&
+ ((TestOperatorEvent) obj).value == this.value);
+ }
+
+ @Override
+ public String toString() {
+ return "TestOperatorEvent (" + value + ')';
+ }
}