You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/30 21:15:03 UTC

[flink] branch master updated (955a683 -> d854b5f)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 955a683  [FLINK-17842][network] Remove NextRecordResponse to improve deserialisation performance
     new b66d16f  [hotfix][runtime] Fox log message for web.log.file to only pring config key and not deprecated keys
     new 1af33f1  [hotfix][checkpointing] Beautify stack trace by stripping wrapping CompletionExceptions
     new b144f13  [hotfix][checkpointing] Improve exception in case Coordinator State ack fails
     new 1a721d8  [FLINK-16986][coordination] (part 1) Provide exactly-once guarantees around checkpoints and operator event sending.
     new b233aa8  [FLINK-16986][coordination][refactor] Reduce dependencies of OperatorCoordinatorHolder and OperatorCoordinatorCheckpointContext
     new 7ceee23  [FLINK-16986][coordination][refactor] Change executor in OperatorCoordinatorSchedulerTest
     new 37f7db3  [FLINK-16986][coordination] (part 2) Make OperatorCoordinatorHolder driven by main thread executor
     new 5926e07  [FLINK-16986][coordination] (part 3) Change OperatorCoordinator interface to support better exactly-once semantics
     new 4aff693  [hotfix][coordination] Improve JavaDocs for OperatorCoordinator and OperatorCoordinatorHolder
     new d046fea  [hotfix][coordination] Remove unused class ExecutionJobVertexCoordinatorContext
     new d854b5f  [hotfix][coordination] Remove unused (and unimplemented) method 'failTask(...)' from OperatorCoordinator Context

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/checkpoint/CheckpointCoordinator.java  |  18 +-
 .../OperatorCoordinatorCheckpointContext.java      |  55 +-
 .../checkpoint/OperatorCoordinatorCheckpoints.java |  26 +-
 .../runtime/checkpoint/PendingCheckpoint.java      |   3 +-
 .../runtime/executiongraph/ExecutionGraph.java     |   9 +-
 .../ExecutionJobVertexCoordinatorContext.java      |  85 ----
 .../coordination/OperatorCoordinator.java          |  93 +++-
 .../coordination/OperatorCoordinatorHolder.java    | 290 +++++++++--
 .../operators/coordination/OperatorEventValve.java | 245 +++++++++
 .../operators/coordination/OperatorInfo.java}      |  34 +-
 .../flink/runtime/scheduler/SchedulerBase.java     |   6 +-
 .../source/coordinator/SourceCoordinator.java      |  13 +-
 .../flink/runtime/webmonitor/WebMonitorUtils.java  |   2 +-
 .../runtime/checkpoint/PendingCheckpointTest.java  |  31 +-
 .../CoordinatorEventsExactlyOnceITCase.java        | 499 ++++++++++++++++++
 .../coordination/MockOperatorCoordinator.java      |   2 +-
 .../MockOperatorCoordinatorContext.java            |  12 -
 .../OperatorCoordinatorHolderTest.java             | 560 +++++++++++++++++++++
 .../OperatorCoordinatorSchedulerTest.java          |  47 +-
 .../coordination/OperatorEventValveTest.java       | 170 +++++++
 .../operators/coordination/TestEventSender.java    | 113 +++++
 .../operators/coordination/TestOperatorEvent.java  |  33 ++
 .../coordination/TestingOperatorCoordinator.java   |  16 +-
 .../coordination/TestingOperatorInfo.java}         |  49 +-
 .../runtime/scheduler/SchedulerTestingUtils.java   |  10 +-
 .../source/coordinator/SourceCoordinatorTest.java  |  24 +-
 .../collect/CollectSinkOperatorCoordinator.java    |   5 +-
 27 files changed, 2148 insertions(+), 302 deletions(-)
 delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexCoordinatorContext.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
 copy flink-runtime/src/{test/java/org/apache/flink/runtime/state/TestingStreamStateHandle.java => main/java/org/apache/flink/runtime/operators/coordination/OperatorInfo.java} (63%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestEventSender.java
 copy flink-runtime/src/{main/java/org/apache/flink/runtime/operators/coordination/OperatorEventDispatcher.java => test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorInfo.java} (56%)


[flink] 11/11: [hotfix][coordination] Remove unused (and unimplemented) method 'failTask(...)' from OperatorCoordinator Context

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d854b5fd5ba6741aeb87cd10ae570b3bb198d498
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 29 21:47:33 2020 +0200

    [hotfix][coordination] Remove unused (and unimplemented) method 'failTask(...)' from OperatorCoordinator Context
---
 .../runtime/operators/coordination/OperatorCoordinator.java  |  2 --
 .../operators/coordination/OperatorCoordinatorHolder.java    |  5 -----
 .../coordination/MockOperatorCoordinatorContext.java         | 12 ------------
 3 files changed, 19 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index cac03d7..fff88c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -168,8 +168,6 @@ public interface OperatorCoordinator extends AutoCloseable {
 		 */
 		CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int targetSubtask) throws TaskNotRunningException;
 
-		void failTask(int subtask, Throwable cause);
-
 		/**
 		 * Fails the job and trigger a global failover operation.
 		 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index e11bbb5..4e4e8f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -425,11 +425,6 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 		}
 
 		@Override
-		public void failTask(final int subtask, final Throwable cause) {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
 		public void failJob(final Throwable cause) {
 			checkInitialized();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
index 7928535..6b50c2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
@@ -24,7 +24,6 @@ import org.apache.flink.util.FlinkRuntimeException;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -35,7 +34,6 @@ public class MockOperatorCoordinatorContext implements OperatorCoordinator.Conte
 	private final boolean failEventSending;
 
 	private final Map<Integer, List<OperatorEvent>> eventsToOperator;
-	private final LinkedHashMap<Integer, Throwable> failedTasks;
 	private boolean jobFailed;
 
 	public MockOperatorCoordinatorContext(OperatorID operatorID, int numSubtasks) {
@@ -46,7 +44,6 @@ public class MockOperatorCoordinatorContext implements OperatorCoordinator.Conte
 		this.operatorID = operatorID;
 		this.numSubtasks = numSubtasks;
 		this.eventsToOperator = new HashMap<>();
-		this.failedTasks = new LinkedHashMap<>();
 		this.jobFailed = false;
 		this.failEventSending = failEventSending;
 	}
@@ -71,11 +68,6 @@ public class MockOperatorCoordinatorContext implements OperatorCoordinator.Conte
 	}
 
 	@Override
-	public void failTask(int subtask, Throwable cause) {
-		failedTasks.put(subtask, cause);
-	}
-
-	@Override
 	public void failJob(Throwable cause) {
 		jobFailed = true;
 	}
@@ -95,10 +87,6 @@ public class MockOperatorCoordinatorContext implements OperatorCoordinator.Conte
 		return eventsToOperator;
 	}
 
-	public LinkedHashMap<Integer, Throwable> getFailedTasks() {
-		return failedTasks;
-	}
-
 	public boolean isJobFailed() {
 		return jobFailed;
 	}


[flink] 05/11: [FLINK-16986][coordination][refactor] Reduce dependencies of OperatorCoordinatorHolder and OperatorCoordinatorCheckpointContext

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b233aa84a1b0d8354211506219d0f785bccae870
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu May 28 18:18:41 2020 +0200

    [FLINK-16986][coordination][refactor] Reduce dependencies of OperatorCoordinatorHolder and OperatorCoordinatorCheckpointContext
    
    This simplifies both testing and future refactoring.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |   7 +-
 .../OperatorCoordinatorCheckpointContext.java      |  41 +----
 .../checkpoint/OperatorCoordinatorCheckpoints.java |  20 +--
 .../runtime/checkpoint/PendingCheckpoint.java      |   3 +-
 .../coordination/OperatorCoordinatorHolder.java    | 172 +++++++++++++--------
 .../coordination/OperatorInfo.java}                |  30 +---
 .../runtime/checkpoint/PendingCheckpointTest.java  |  27 ++--
 ...kpointContext.java => TestingOperatorInfo.java} |  31 +---
 8 files changed, 151 insertions(+), 180 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 51b98f5..da518ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
 import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
@@ -626,7 +627,7 @@ public class CheckpointCoordinator {
 			checkpointID,
 			timestamp,
 			ackTasks,
-			OperatorCoordinatorCheckpointContext.getIds(coordinatorsToCheckpoint),
+			OperatorInfo.getIds(coordinatorsToCheckpoint),
 			masterHooks.keySet(),
 			props,
 			checkpointStorageLocation,
@@ -1074,7 +1075,7 @@ public class CheckpointCoordinator {
 
 		// commit coordinators
 		for (OperatorCoordinatorCheckpointContext coordinatorContext : coordinatorsToCheckpoint) {
-			coordinatorContext.coordinator().checkpointComplete(checkpointId);
+			coordinatorContext.checkpointComplete(checkpointId);
 		}
 	}
 
@@ -1496,7 +1497,7 @@ public class CheckpointCoordinator {
 
 			final ByteStreamStateHandle coordinatorState = state.getCoordinatorState();
 			if (coordinatorState != null) {
-				coordContext.coordinator().resetToCheckpoint(coordinatorState.getData());
+				coordContext.resetToCheckpoint(coordinatorState.getData());
 			}
 		}
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
index 92fd2aa..abc15b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
@@ -18,49 +18,24 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
 
-import java.util.Collection;
-import java.util.stream.Collectors;
+import java.util.concurrent.CompletableFuture;
 
 /**
- * An {@link OperatorCoordinator} and its contextual information needed to trigger and
- * acknowledge a checkpoint.
+ * This context is the interface through which the {@link CheckpointCoordinator} interacts with an
+ * {@link OperatorCoordinator} during checkpointing and checkpoint restoring.
  */
-public interface OperatorCoordinatorCheckpointContext {
+public interface OperatorCoordinatorCheckpointContext extends OperatorInfo {
 
-	// ------------------------------------------------------------------------
-	//  properties
-	// ------------------------------------------------------------------------
-
-	OperatorCoordinator coordinator();
-
-	OperatorID operatorId();
-
-	int maxParallelism();
-
-	int currentParallelism();
-
-	// ------------------------------------------------------------------------
-	//  checkpoint triggering callbacks
-	// ------------------------------------------------------------------------
-
-	void onCallTriggerCheckpoint(long checkpointId);
-
-	void onCheckpointStateFutureComplete(long checkpointId);
+	CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception;
 
 	void afterSourceBarrierInjection(long checkpointId);
 
 	void abortCurrentTriggering();
 
-	// ------------------------------------------------------------------------
-	//  utils
-	// ------------------------------------------------------------------------
+	void checkpointComplete(long checkpointId);
 
-	static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
-		return infos.stream()
-			.map(OperatorCoordinatorCheckpointContext::operatorId)
-			.collect(Collectors.toList());
-	}
+	void resetToCheckpoint(byte[] checkpointData) throws Exception;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
index 68fc8f1..6215123 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
 import java.util.ArrayList;
@@ -39,15 +40,15 @@ import java.util.concurrent.Executor;
 final class OperatorCoordinatorCheckpoints {
 
 	public static CompletableFuture<CoordinatorSnapshot> triggerCoordinatorCheckpoint(
-			final OperatorCoordinatorCheckpointContext coordinatorInfo,
+			final OperatorCoordinatorCheckpointContext coordinatorContext,
 			final long checkpointId) throws Exception {
 
 		final CompletableFuture<byte[]> checkpointFuture =
-				coordinatorInfo.coordinator().checkpointCoordinator(checkpointId);
+			coordinatorContext.checkpointCoordinator(checkpointId);
 
 		return checkpointFuture.thenApply(
 				(state) -> new CoordinatorSnapshot(
-						coordinatorInfo, new ByteStreamStateHandle(coordinatorInfo.operatorId().toString(), state))
+						coordinatorContext, new ByteStreamStateHandle(coordinatorContext.operatorId().toString(), state))
 		);
 	}
 
@@ -59,16 +60,7 @@ final class OperatorCoordinatorCheckpoints {
 
 		for (final OperatorCoordinatorCheckpointContext coordinator : coordinators) {
 			final CompletableFuture<CoordinatorSnapshot> checkpointFuture = triggerCoordinatorCheckpoint(coordinator, checkpointId);
-			coordinator.onCallTriggerCheckpoint(checkpointId);
-
 			individualSnapshots.add(checkpointFuture);
-			checkpointFuture.whenComplete((ignored, failure) -> {
-				if (failure != null) {
-					coordinator.abortCurrentTriggering();
-				} else {
-					coordinator.onCheckpointStateFutureComplete(checkpointId);
-				}
-			});
 		}
 
 		return FutureUtils.combineAll(individualSnapshots).thenApply(AllCoordinatorSnapshots::new);
@@ -144,10 +136,10 @@ final class OperatorCoordinatorCheckpoints {
 
 	static final class CoordinatorSnapshot {
 
-		final OperatorCoordinatorCheckpointContext coordinator;
+		final OperatorInfo coordinator;
 		final ByteStreamStateHandle state;
 
-		CoordinatorSnapshot(OperatorCoordinatorCheckpointContext coordinator, ByteStreamStateHandle state) {
+		CoordinatorSnapshot(OperatorInfo coordinator, ByteStreamStateHandle state) {
 			this.coordinator = coordinator;
 			this.state = state;
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 2a0eba7..376301d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
 import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
@@ -431,7 +432,7 @@ public class PendingCheckpoint {
 	}
 
 	public TaskAcknowledgeResult acknowledgeCoordinatorState(
-			OperatorCoordinatorCheckpointContext coordinatorInfo,
+			OperatorInfo coordinatorInfo,
 			@Nullable ByteStreamStateHandle stateHandle) {
 
 		synchronized (lock) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index c70e229..304eb20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -35,6 +36,7 @@ import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -67,7 +69,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 	private final int operatorParallelism;
 	private final int operatorMaxParallelism;
 
-	private long currentlyTriggeredCheckpoint;
+	private volatile long currentlyTriggeredCheckpoint;
 
 	private OperatorCoordinatorHolder(
 			final OperatorID operatorId,
@@ -88,21 +90,25 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 	}
 
 	public void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
-		context.lazyInitialize(scheduler, schedulerExecutor);
+		lazyInitialize(scheduler::handleGlobalFailure, schedulerExecutor);
+	}
+
+	@VisibleForTesting
+	void lazyInitialize(Consumer<Throwable> globalFailureHandler, Executor schedulerExecutor) {
+		context.lazyInitialize(globalFailureHandler, schedulerExecutor);
 	}
 
 	// ------------------------------------------------------------------------
 	//  Properties
 	// ------------------------------------------------------------------------
 
-	@Override
-	public OperatorID operatorId() {
-		return operatorId;
+	public OperatorCoordinator coordinator() {
+		return coordinator;
 	}
 
 	@Override
-	public OperatorCoordinator coordinator() {
-		return coordinator;
+	public OperatorID operatorId() {
+		return operatorId;
 	}
 
 	@Override
@@ -116,39 +122,6 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 	}
 
 	// ------------------------------------------------------------------------
-	//  Checkpointing Callbacks
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void onCallTriggerCheckpoint(long checkpointId) {
-		checkCheckpointAlreadyHappening(checkpointId);
-		currentlyTriggeredCheckpoint = checkpointId;
-	}
-
-	@Override
-	public void onCheckpointStateFutureComplete(long checkpointId) {
-		checkCheckpointAlreadyHappening(checkpointId);
-		eventValve.shutValve();
-	}
-
-	@Override
-	public void afterSourceBarrierInjection(long checkpointId) {
-		checkCheckpointAlreadyHappening(checkpointId);
-		eventValve.openValve();
-		currentlyTriggeredCheckpoint = NO_CHECKPOINT;
-	}
-
-	@Override
-	public void abortCurrentTriggering() {
-		eventValve.openValve();
-		currentlyTriggeredCheckpoint = NO_CHECKPOINT;
-	}
-
-	private void checkCheckpointAlreadyHappening(long checkpointId) {
-		checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || currentlyTriggeredCheckpoint == checkpointId);
-	}
-
-	// ------------------------------------------------------------------------
 	//  OperatorCoordinator Interface
 	// ------------------------------------------------------------------------
 
@@ -177,7 +150,20 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 
 	@Override
 	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
-		return coordinator.checkpointCoordinator(checkpointId);
+		setCurrentlyTriggeredCheckpoint(checkpointId);
+
+		final CompletableFuture<byte[]> checkpointFuture = coordinator.checkpointCoordinator(checkpointId);
+
+		// synchronously!!!, with the completion, we need to shut the event valve
+		checkpointFuture.whenComplete((ignored, failure) -> {
+			if (failure != null) {
+				abortCurrentTriggering();
+			} else {
+				onCheckpointStateFutureComplete(checkpointId);
+			}
+		});
+
+		return checkpointFuture;
 	}
 
 	@Override
@@ -187,10 +173,47 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 
 	@Override
 	public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+		resetCheckpointTriggeringCheck();
+		eventValve.reset();
 		coordinator.resetToCheckpoint(checkpointData);
 	}
 
 	// ------------------------------------------------------------------------
+	//  Checkpointing Callbacks
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void afterSourceBarrierInjection(long checkpointId) {
+		verifyNoOtherCheckpointBeingTriggered(checkpointId);
+		eventValve.openValve();
+		resetCheckpointTriggeringCheck();
+	}
+
+	@Override
+	public void abortCurrentTriggering() {
+		eventValve.openValve();
+		resetCheckpointTriggeringCheck();
+	}
+
+	void onCheckpointStateFutureComplete(long checkpointId) {
+		verifyNoOtherCheckpointBeingTriggered(checkpointId);
+		eventValve.shutValve();
+	}
+
+	private void verifyNoOtherCheckpointBeingTriggered(long checkpointId) {
+		checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || currentlyTriggeredCheckpoint == checkpointId);
+	}
+
+	private void setCurrentlyTriggeredCheckpoint(long checkpointId) {
+		verifyNoOtherCheckpointBeingTriggered(checkpointId);
+		currentlyTriggeredCheckpoint = checkpointId;
+	}
+
+	private void resetCheckpointTriggeringCheck() {
+		currentlyTriggeredCheckpoint = NO_CHECKPOINT;
+	}
+
+	// ------------------------------------------------------------------------
 	//  Factories
 	// ------------------------------------------------------------------------
 
@@ -209,20 +232,41 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 					return executionAttempt.sendOperatorEvent(opId, serializedEvent);
 				};
 
-			final OperatorEventValve valve = new OperatorEventValve(eventSender);
-			final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(opId, jobVertex, valve);
-			final OperatorCoordinator coordinator = provider.create(context);
-
-			return new OperatorCoordinatorHolder(
+			return create(
 					opId,
-					coordinator,
-					context,
-					valve,
+					provider,
+					eventSender,
+					jobVertex.getName(),
 					jobVertex.getParallelism(),
 					jobVertex.getMaxParallelism());
 		}
 	}
 
+	@VisibleForTesting
+	static OperatorCoordinatorHolder create(
+			final OperatorID opId,
+			final OperatorCoordinator.Provider coordinatorProvider,
+			final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender,
+			final String operatorName,
+			final int operatorParallelism,
+			final int operatorMaxParallelism) {
+
+		final OperatorEventValve valve = new OperatorEventValve(eventSender);
+
+		final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(
+				opId, valve, operatorName, operatorParallelism);
+
+		final OperatorCoordinator coordinator = coordinatorProvider.create(context);
+
+		return new OperatorCoordinatorHolder(
+				opId,
+				coordinator,
+				context,
+				valve,
+				operatorParallelism,
+				operatorMaxParallelism);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Nested Classes
 	// ------------------------------------------------------------------------
@@ -239,33 +283,36 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 	private static final class LazyInitializedCoordinatorContext implements OperatorCoordinator.Context {
 
 		private final OperatorID operatorId;
-		private final ExecutionJobVertex jobVertex;
 		private final OperatorEventValve eventValve;
+		private final String operatorName;
+		private final int operatorParallelism;
 
-		private SchedulerNG scheduler;
+		private Consumer<Throwable> globalFailureHandler;
 		private Executor schedulerExecutor;
 
 		public LazyInitializedCoordinatorContext(
-				OperatorID operatorId,
-				ExecutionJobVertex jobVertex,
-				OperatorEventValve eventValve) {
+				final OperatorID operatorId,
+				final OperatorEventValve eventValve,
+				final String operatorName,
+				final int operatorParallelism) {
 			this.operatorId = checkNotNull(operatorId);
-			this.jobVertex = checkNotNull(jobVertex);
 			this.eventValve = checkNotNull(eventValve);
+			this.operatorName = checkNotNull(operatorName);
+			this.operatorParallelism = operatorParallelism;
 		}
 
-		void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
-			this.scheduler = checkNotNull(scheduler);
+		void lazyInitialize(Consumer<Throwable> globalFailureHandler, Executor schedulerExecutor) {
+			this.globalFailureHandler = checkNotNull(globalFailureHandler);
 			this.schedulerExecutor = checkNotNull(schedulerExecutor);
 		}
 
 		void unInitialize() {
-			this.scheduler = null;
+			this.globalFailureHandler = null;
 			this.schedulerExecutor = null;
 		}
 
 		boolean isInitialized() {
-			return jobVertex != null;
+			return schedulerExecutor != null;
 		}
 
 		private void checkInitialized() {
@@ -309,15 +356,14 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 			checkInitialized();
 
 			final FlinkException e = new FlinkException("Global failure triggered by OperatorCoordinator for '" +
-				jobVertex.getName() + "' (operator " + operatorId + ").", cause);
+				operatorName + "' (operator " + operatorId + ").", cause);
 
-			schedulerExecutor.execute(() -> scheduler.handleGlobalFailure(e));
+			schedulerExecutor.execute(() -> globalFailureHandler.accept(e));
 		}
 
 		@Override
 		public int currentParallelism() {
-			checkInitialized();
-			return jobVertex.getParallelism();
+			return operatorParallelism;
 		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorInfo.java
similarity index 54%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorInfo.java
index 92fd2aa..a146940 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorInfo.java
@@ -16,25 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.checkpoint;
+package org.apache.flink.runtime.operators.coordination;
 
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 
 import java.util.Collection;
 import java.util.stream.Collectors;
 
 /**
- * An {@link OperatorCoordinator} and its contextual information needed to trigger and
- * acknowledge a checkpoint.
+ * An interface to access basic properties of an operator in the context of its coordinator.
  */
-public interface OperatorCoordinatorCheckpointContext {
-
-	// ------------------------------------------------------------------------
-	//  properties
-	// ------------------------------------------------------------------------
-
-	OperatorCoordinator coordinator();
+public interface OperatorInfo {
 
 	OperatorID operatorId();
 
@@ -43,24 +35,12 @@ public interface OperatorCoordinatorCheckpointContext {
 	int currentParallelism();
 
 	// ------------------------------------------------------------------------
-	//  checkpoint triggering callbacks
-	// ------------------------------------------------------------------------
-
-	void onCallTriggerCheckpoint(long checkpointId);
-
-	void onCheckpointStateFutureComplete(long checkpointId);
-
-	void afterSourceBarrierInjection(long checkpointId);
-
-	void abortCurrentTriggering();
-
-	// ------------------------------------------------------------------------
 	//  utils
 	// ------------------------------------------------------------------------
 
-	static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
+	static Collection<OperatorID> getIds(Collection<? extends OperatorInfo> infos) {
 		return infos.stream()
-			.map(OperatorCoordinatorCheckpointContext::operatorId)
+			.map(OperatorInfo::operatorId)
 			.collect(Collectors.toList());
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index b454633..30c1dc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -32,7 +32,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorCheckpointContext;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
+import org.apache.flink.runtime.operators.coordination.TestingOperatorInfo;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.TestingStreamStateHandle;
@@ -438,7 +439,7 @@ public class PendingCheckpointTest {
 	@Test
 	public void testInitiallyUnacknowledgedCoordinatorStates() throws Exception {
 		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(
-				createOperatorCoordinator(), createOperatorCoordinator());
+				new TestingOperatorInfo(), new TestingOperatorInfo());
 
 		assertEquals(2, checkpoint.getNumberOfNonAcknowledgedOperatorCoordinators());
 		assertFalse(checkpoint.isFullyAcknowledged());
@@ -446,8 +447,8 @@ public class PendingCheckpointTest {
 
 	@Test
 	public void testAcknowledgedCoordinatorStates() throws Exception {
-		final OperatorCoordinatorCheckpointContext coord1 = createOperatorCoordinator();
-		final OperatorCoordinatorCheckpointContext coord2 = createOperatorCoordinator();
+		final OperatorInfo coord1 = new TestingOperatorInfo();
+		final OperatorInfo coord2 = new TestingOperatorInfo();
 		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coord1, coord2);
 
 		final TaskAcknowledgeResult ack1 = checkpoint.acknowledgeCoordinatorState(coord1, new TestingStreamStateHandle());
@@ -462,7 +463,7 @@ public class PendingCheckpointTest {
 
 	@Test
 	public void testDuplicateAcknowledgeCoordinator() throws Exception {
-		final OperatorCoordinatorCheckpointContext coordinator = createOperatorCoordinator();
+		final OperatorInfo coordinator = new TestingOperatorInfo();
 		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coordinator);
 
 		checkpoint.acknowledgeCoordinatorState(coordinator, new TestingStreamStateHandle());
@@ -473,9 +474,9 @@ public class PendingCheckpointTest {
 
 	@Test
 	public void testAcknowledgeUnknownCoordinator() throws Exception {
-		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(createOperatorCoordinator());
+		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(new TestingOperatorInfo());
 
-		final TaskAcknowledgeResult ack = checkpoint.acknowledgeCoordinatorState(createOperatorCoordinator(), null);
+		final TaskAcknowledgeResult ack = checkpoint.acknowledgeCoordinatorState(new TestingOperatorInfo(), null);
 
 		assertEquals(TaskAcknowledgeResult.UNKNOWN, ack);
 	}
@@ -507,11 +508,11 @@ public class PendingCheckpointTest {
 	}
 
 	private PendingCheckpoint createPendingCheckpointWithCoordinators(
-			OperatorCoordinatorCheckpointContext... coordinators) throws IOException {
+				OperatorInfo... coordinators) throws IOException {
 
 		final PendingCheckpoint checkpoint = createPendingCheckpoint(
 				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-				OperatorCoordinatorCheckpointContext.getIds(Arrays.asList(coordinators)),
+				OperatorInfo.getIds(Arrays.asList(coordinators)),
 				Collections.emptyList(),
 				Executors.directExecutor());
 
@@ -520,9 +521,9 @@ public class PendingCheckpointTest {
 	}
 
 	private PendingCheckpoint createPendingCheckpointWithAcknowledgedCoordinators(ByteStreamStateHandle... handles) throws IOException {
-		OperatorCoordinatorCheckpointContext[] coords = new OperatorCoordinatorCheckpointContext[handles.length];
+		final OperatorInfo[] coords = new OperatorInfo[handles.length];
 		for (int i = 0; i < handles.length; i++) {
-			coords[i] = createOperatorCoordinator();
+			coords[i] = new TestingOperatorInfo();
 		}
 
 		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coords);
@@ -562,10 +563,6 @@ public class PendingCheckpointTest {
 			new CompletableFuture<>());
 	}
 
-	private static OperatorCoordinatorCheckpointContext createOperatorCoordinator() {
-		return new MockOperatorCoordinatorCheckpointContext();
-	}
-
 	@SuppressWarnings("unchecked")
 	static void setTaskState(PendingCheckpoint pending, OperatorState state) throws NoSuchFieldException, IllegalAccessException {
 		Field field = PendingCheckpoint.class.getDeclaredField("operatorStates");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorInfo.java
similarity index 60%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorInfo.java
index 7cab82f..f8d4a1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorInfo.java
@@ -18,40 +18,31 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
-import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 
 /**
- * A testing mock implementation of the {@link OperatorCoordinatorCheckpointContext}.
+ * A testing implementation of the {@link OperatorInfo}.
  */
-public class MockOperatorCoordinatorCheckpointContext implements OperatorCoordinatorCheckpointContext {
+public class TestingOperatorInfo implements OperatorInfo {
 
 	private final OperatorID operatorId;
-	private final OperatorCoordinator coordinator;
 	private final int parallelism;
 	private final int maxParallelism;
 
-	public MockOperatorCoordinatorCheckpointContext() {
-		this(new OperatorID(), new MockOperatorCoordinator(), 50, 256);
+	public TestingOperatorInfo() {
+		this(new OperatorID(), 50, 256);
 	}
 
-	public MockOperatorCoordinatorCheckpointContext(
+	public TestingOperatorInfo(
 			OperatorID operatorId,
-			OperatorCoordinator coordinator,
 			int parallelism,
 			int maxParallelism) {
 		this.operatorId = operatorId;
-		this.coordinator = coordinator;
 		this.parallelism = parallelism;
 		this.maxParallelism = maxParallelism;
 	}
 
 	@Override
-	public OperatorCoordinator coordinator() {
-		return coordinator;
-	}
-
-	@Override
 	public OperatorID operatorId() {
 		return operatorId;
 	}
@@ -65,16 +56,4 @@ public class MockOperatorCoordinatorCheckpointContext implements OperatorCoordin
 	public int currentParallelism() {
 		return parallelism;
 	}
-
-	@Override
-	public void onCallTriggerCheckpoint(long checkpointId) {}
-
-	@Override
-	public void onCheckpointStateFutureComplete(long checkpointId) {}
-
-	@Override
-	public void afterSourceBarrierInjection(long checkpointId) {}
-
-	@Override
-	public void abortCurrentTriggering() {}
 }


[flink] 02/11: [hotfix][checkpointing] Beautify stack trace by stripping wrapping CompletionExceptions

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1af33f1285d557f0171f4587d7f4e789df27e7cb
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 29 11:13:34 2020 +0200

    [hotfix][checkpointing] Beautify stack trace by stripping wrapping CompletionExceptions
---
 .../org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java     | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 7689f29..b283a2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -773,6 +773,9 @@ public class CheckpointCoordinator {
 	 * @param throwable the reason of trigger failure
 	 */
 	private void onTriggerFailure(@Nullable PendingCheckpoint checkpoint, Throwable throwable) {
+		// beautify the stack trace a bit
+		throwable = ExceptionUtils.stripCompletionException(throwable);
+
 		try {
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
 				int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();


[flink] 08/11: [FLINK-16986][coordination] (part 3) Change OperatorCoordinator interface to support better exactly-once semantics

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5926e07f01f6c91a8a0265e5f4b30086572d8125
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 29 18:23:45 2020 +0200

    [FLINK-16986][coordination] (part 3) Change OperatorCoordinator interface to support better exactly-once semantics
    
    The semantics are defined as follows:
      - The OperatorCoordinator implementation must have a way of strictly ordering the sending of events and
        the completion of the checkpoint future (for example the same thread does both actions, or the actions
        are guarded by a mutex).
      - Every event sent before the checkpoint future is completed should be before the checkpoint
      - Eveny event sent after the checkpoint future is completed should be after the checkpoint
    
    The previous interface did not allow us to observe this point accurately. The future was created inside the
    application-specific OperatorCoordinator code and returned from the methods. By the time that the scheduler/checkpointing
    code could observe the future (attach handlers to it), some (small amount of) time had inevitably passed in the meantime.
    Within that time, the future could already be complete and some events could have been sent, and in that case the
    scheduler/checkpointing code could not determin which events were before the completion of the future, and which
    events were after the completion of the future.
    
    The changed interface passes the future from the scheduler/checkpointing code into the coordinator. The future already
    has synchronous handlers attached to it which exactly mark the point when the future was completed, allowing the
    scheduler/checkpointing code to observe the correct order in which the Checkpoint Coordinator implementation
    performed its actions (event sending, future completion).
---
 .../OperatorCoordinatorCheckpointContext.java      |  2 +-
 .../checkpoint/OperatorCoordinatorCheckpoints.java |  4 +-
 .../coordination/OperatorCoordinator.java          |  2 +-
 .../coordination/OperatorCoordinatorHolder.java    | 28 ++++++--------
 .../source/coordinator/SourceCoordinator.java      | 13 ++++---
 .../CoordinatorEventsExactlyOnceITCase.java        |  6 +--
 .../coordination/MockOperatorCoordinator.java      |  2 +-
 .../OperatorCoordinatorHolderTest.java             | 44 ++++++++++++----------
 .../OperatorCoordinatorSchedulerTest.java          |  2 +-
 .../coordination/TestingOperatorCoordinator.java   |  8 +---
 .../source/coordinator/SourceCoordinatorTest.java  | 24 +++++++++---
 .../collect/CollectSinkOperatorCoordinator.java    |  5 ++-
 12 files changed, 75 insertions(+), 65 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
index abc15b8..94d4e07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
@@ -29,7 +29,7 @@ import java.util.concurrent.CompletableFuture;
  */
 public interface OperatorCoordinatorCheckpointContext extends OperatorInfo {
 
-	CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception;
+	void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception;
 
 	void afterSourceBarrierInjection(long checkpointId);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
index 6215123..8de997a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
@@ -43,8 +43,8 @@ final class OperatorCoordinatorCheckpoints {
 			final OperatorCoordinatorCheckpointContext coordinatorContext,
 			final long checkpointId) throws Exception {
 
-		final CompletableFuture<byte[]> checkpointFuture =
-			coordinatorContext.checkpointCoordinator(checkpointId);
+		final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+		coordinatorContext.checkpointCoordinator(checkpointId, checkpointFuture);
 
 		return checkpointFuture.thenApply(
 				(state) -> new CoordinatorSnapshot(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index cb388b2..dc06ae0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -77,7 +77,7 @@ public interface OperatorCoordinator extends AutoCloseable {
 
 	// ------------------------------------------------------------------------
 
-	CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception;
+	void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception;
 
 	/**
 	 * Notifies the coordinator that the checkpoint with the given checkpointId completes and
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 9e68c7b..321b401 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -151,14 +151,12 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 	}
 
 	@Override
-	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
+	public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
 		// unfortunately, this method does not run in the scheduler executor, but in the
 		// checkpoint coordinator time thread.
 		// we can remove the delegation once the checkpoint coordinator runs fully in the scheduler's
 		// main thread executor
-		final CompletableFuture<byte[]> future = new CompletableFuture<>();
-		mainThreadExecutor.execute(() -> checkpointCoordinatorInternal(checkpointId, future));
-		return future;
+		mainThreadExecutor.execute(() -> checkpointCoordinatorInternal(checkpointId, result));
 	}
 
 	@Override
@@ -187,19 +185,8 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 	private void checkpointCoordinatorInternal(final long checkpointId, final CompletableFuture<byte[]> result) {
 		mainThreadExecutor.assertRunningInMainThread();
 
-		final CompletableFuture<byte[]> checkpointFuture;
-		try {
-			eventValve.markForCheckpoint(checkpointId);
-			checkpointFuture = coordinator.checkpointCoordinator(checkpointId);
-		} catch (Throwable t) {
-			ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-			result.completeExceptionally(t);
-			globalFailureHandler.accept(t);
-			return;
-		}
-
 		// synchronously!!!, with the completion, we need to shut the event valve
-		checkpointFuture.whenComplete((success, failure) -> {
+		result.whenComplete((success, failure) -> {
 			if (failure != null) {
 				result.completeExceptionally(failure);
 			} else {
@@ -211,6 +198,15 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 				}
 			}
 		});
+
+		try {
+			eventValve.markForCheckpoint(checkpointId);
+			coordinator.checkpointCoordinator(checkpointId, result);
+		} catch (Throwable t) {
+			ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+			result.completeExceptionally(t);
+			globalFailureHandler.accept(t);
+		}
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index e0edbad..dc5f1dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -165,17 +165,18 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements
 	}
 
 	@Override
-	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+	public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
 		ensureStarted();
-		return CompletableFuture.supplyAsync(() -> {
+
+		coordinatorExecutor.execute(() -> {
 			try {
 				LOG.debug("Taking a state snapshot on operator {} for checkpoint {}", operatorName, checkpointId);
-				return toBytes(checkpointId);
+				result.complete(toBytes(checkpointId));
 			} catch (Exception e) {
-				throw new CompletionException(
-						String.format("Failed to checkpoint coordinator for source %s due to ", operatorName), e);
+				result.completeExceptionally(new CompletionException(
+						String.format("Failed to checkpoint coordinator for source %s due to ", operatorName), e));
 			}
-		}, coordinatorExecutor);
+		});
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index 919e6fe..dacf5fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -292,10 +292,8 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
 		}
 
 		@Override
-		public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
-			final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
-			requestedCheckpoint = checkpointFuture;
-			return checkpointFuture;
+		public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
+			requestedCheckpoint = result;
 		}
 
 		@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
index 5e47350..f4e8444 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
@@ -49,7 +49,7 @@ public final class MockOperatorCoordinator implements OperatorCoordinator {
 	}
 
 	@Override
-	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
+	public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
 		throw new UnsupportedOperationException();
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
index fa4e7cf..29ec382 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -76,7 +76,9 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
 
-		final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(1L);
+		final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+		holder.checkpointCoordinator(1L, checkpointFuture);
+
 		assertFalse(checkpointFuture.isDone());
 	}
 
@@ -87,7 +89,8 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 
 		final byte[] testData = new byte[] {11, 22, 33, 44};
 
-		final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(9L);
+		final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+		holder.checkpointCoordinator(9L, checkpointFuture);
 		getCoordinator(holder).getLastTriggeredCheckpoint().complete(testData);
 
 		assertTrue(checkpointFuture.isDone());
@@ -99,7 +102,7 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
 
-		holder.checkpointCoordinator(1L);
+		holder.checkpointCoordinator(1L, new CompletableFuture<>());
 		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 1);
 
 		assertThat(sender.events, contains(
@@ -194,7 +197,9 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
 
-		final CompletableFuture<byte[]> holderFuture = holder.checkpointCoordinator(1000L);
+		final CompletableFuture<byte[]> holderFuture = new CompletableFuture<>();
+		holder.checkpointCoordinator(1000L, holderFuture);
+
 		final CompletableFuture<byte[]> future1 = getCoordinator(holder).getLastTriggeredCheckpoint();
 		holder.abortCurrentTriggering();
 
@@ -203,7 +208,6 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 
 		future1.complete(new byte[0]);
 
-		assertTrue(holderFuture.isCompletedExceptionally());
 		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(123), 0);
 
 		assertThat(sender.events, contains(
@@ -216,8 +220,10 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
 
-		holder.checkpointCoordinator(11L);
-		final CompletableFuture<?> future = holder.checkpointCoordinator(12L);
+		holder.checkpointCoordinator(11L, new CompletableFuture<>());
+
+		final CompletableFuture<byte[]> future = new CompletableFuture<>();
+		holder.checkpointCoordinator(12L, future);
 
 		assertTrue(future.isCompletedExceptionally());
 		assertNotNull(globalFailure);
@@ -314,7 +320,8 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 		executor.triggerAll();
 
 		// trigger the checkpoint - this should also shut the valve as soon as the future is completed
-		final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(0L);
+		final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+		holder.checkpointCoordinator(0L, checkpointFuture);
 		executor.triggerAll();
 
 		// give the coordinator some time to emit some events
@@ -339,7 +346,8 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 			OperatorCoordinatorHolder holder,
 			long checkpointId) throws Exception {
 
-		final CompletableFuture<byte[]> future = holder.checkpointCoordinator(checkpointId);
+		final CompletableFuture<byte[]> future = new CompletableFuture<>();
+		holder.checkpointCoordinator(checkpointId, future);
 		getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]);
 		return future;
 	}
@@ -422,17 +430,15 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 		}
 
 		@Override
-		public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
-			// we create the checkpoint future, but before returning it, we wait on a
-			// condition. that way, we simulate a "context switch" just at the time when the
+		public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
+			// before returning from this methof, we wait on a condition.
+			// that way, we simulate a "context switch" just at the time when the
 			// future would be returned and make the other thread complete the future and send an
 			// event before this method returns
-			final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
 			lock.lock();
 			try {
-				checkpoint = checkpointFuture;
+				checkpoint = result;
 				condition.await();
-				return checkpointFuture;
 			} finally {
 				lock.unlock();
 			}
@@ -472,10 +478,8 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 		}
 
 		@Override
-		public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
-			final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
-			checkpoint = checkpointFuture;
-			return checkpointFuture;
+		public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
+			checkpoint = result;
 		}
 
 		@Override
@@ -525,7 +529,7 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 		public void subtaskFailed(int subtask, @Nullable Throwable reason) {}
 
 		@Override
-		public abstract CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception;
+		public abstract void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception;
 
 		@Override
 		public void checkpointComplete(long checkpointId) {}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 6ec4dd7..353b6b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -689,7 +689,7 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		}
 
 		@Override
-		public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
+		public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
 			throw new Error(new TestException());
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index e914afe..e91bce3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -75,13 +75,9 @@ class TestingOperatorCoordinator implements OperatorCoordinator {
 	}
 
 	@Override
-	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
-		final CompletableFuture<byte[]> coordinatorStateFuture = new CompletableFuture<>();
-
-		boolean added = triggeredCheckpoints.offer(coordinatorStateFuture);
+	public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
+		boolean added = triggeredCheckpoints.offer(result);
 		assert added; // guard the test assumptions
-
-		return coordinatorStateFuture;
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index d92b9ba..be244fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -32,6 +32,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment;
 import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyException;
@@ -55,7 +56,7 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
 				failureMessage, "The coordinator has not started yet.");
 		verifyException(() -> sourceCoordinator.subtaskFailed(0, null),
 				failureMessage, "The coordinator has not started yet.");
-		verifyException(() -> sourceCoordinator.checkpointCoordinator(100L),
+		verifyException(() -> sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture<>()),
 				failureMessage, "The coordinator has not started yet.");
 	}
 
@@ -114,7 +115,10 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
 		sourceCoordinator.start();
 		sourceCoordinator.handleEventFromOperator(
 				0, new ReaderRegistrationEvent(0, "location_0"));
-		byte[] bytes = sourceCoordinator.checkpointCoordinator(100L).get();
+
+		final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+		sourceCoordinator.checkpointCoordinator(100L, checkpointFuture);
+		final byte[] bytes = checkpointFuture.get();
 
 		// restore from the checkpoints.
 		SourceCoordinator<?, ?> restoredCoordinator = getNewSourceCoordinator();
@@ -136,11 +140,17 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
 		// Assign some splits to reader 0 then take snapshot 100.
 		sourceCoordinator.handleEventFromOperator(
 				0, new ReaderRegistrationEvent(0, "location_0"));
-		sourceCoordinator.checkpointCoordinator(100L).get();
+
+		final CompletableFuture<byte[]> checkpointFuture1 = new CompletableFuture<>();
+		sourceCoordinator.checkpointCoordinator(100L, checkpointFuture1);
+		checkpointFuture1.get();
 
 		// Add split 6, assign it to reader 0 and take another snapshot 101.
 		enumerator.addNewSplits(Collections.singletonList(new MockSourceSplit(6)));
-		sourceCoordinator.checkpointCoordinator(101L).get();
+
+		final CompletableFuture<byte[]> checkpointFuture2 = new CompletableFuture<>();
+		sourceCoordinator.checkpointCoordinator(101L, checkpointFuture2);
+		checkpointFuture2.get();
 
 		// check the state.
 		check(() -> {
@@ -185,7 +195,11 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
 		// Assign some splits to reader 0 then take snapshot 100.
 		sourceCoordinator.handleEventFromOperator(
 				0, new ReaderRegistrationEvent(0, "location_0"));
-		sourceCoordinator.checkpointCoordinator(100L).get();
+
+		final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+		sourceCoordinator.checkpointCoordinator(100L, checkpointFuture);
+		checkpointFuture.get();
+
 		// Complete checkpoint 100.
 		sourceCoordinator.checkpointComplete(100L);
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
index 6c84266..695e4bd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
@@ -190,11 +190,12 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor
 	}
 
 	@Override
-	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+	public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		ObjectOutputStream oos = new ObjectOutputStream(baos);
 		oos.writeObject(address);
-		return CompletableFuture.completedFuture(baos.toByteArray());
+
+		result.complete(baos.toByteArray());
 	}
 
 	@Override


[flink] 07/11: [FLINK-16986][coordination] (part 2) Make OperatorCoordinatorHolder driven by main thread executor

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 37f7db38290df065669178cc6407edd5055f1951
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu May 28 18:19:30 2020 +0200

    [FLINK-16986][coordination] (part 2) Make OperatorCoordinatorHolder driven by main thread executor
---
 .../coordination/OperatorCoordinatorHolder.java    | 132 +++--
 .../operators/coordination/OperatorEventValve.java |  62 ++-
 .../flink/runtime/scheduler/SchedulerBase.java     |   2 +-
 .../OperatorCoordinatorHolderTest.java             | 556 +++++++++++++++++++++
 .../coordination/OperatorEventValveTest.java       |  46 +-
 5 files changed, 725 insertions(+), 73 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 304eb20..9e68c7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -20,11 +20,13 @@ package org.apache.flink.runtime.operators.coordination;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.SerializedValue;
@@ -57,19 +59,16 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorCoordinatorCheckpointContext {
 
-	private static final long NO_CHECKPOINT = Long.MIN_VALUE;
-
 	private final OperatorCoordinator coordinator;
 	private final OperatorID operatorId;
 	private final LazyInitializedCoordinatorContext context;
-
 	private final OperatorEventValve eventValve;
 
-	// these two fields are needed for the construction of OperatorStateHandles when taking checkpoints
 	private final int operatorParallelism;
 	private final int operatorMaxParallelism;
 
-	private volatile long currentlyTriggeredCheckpoint;
+	private Consumer<Throwable> globalFailureHandler;
+	private ComponentMainThreadExecutor mainThreadExecutor;
 
 	private OperatorCoordinatorHolder(
 			final OperatorID operatorId,
@@ -85,17 +84,17 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 		this.eventValve = checkNotNull(eventValve);
 		this.operatorParallelism = operatorParallelism;
 		this.operatorMaxParallelism = operatorMaxParallelism;
-
-		this.currentlyTriggeredCheckpoint = NO_CHECKPOINT;
 	}
 
-	public void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
-		lazyInitialize(scheduler::handleGlobalFailure, schedulerExecutor);
+	public void lazyInitialize(SchedulerNG scheduler, ComponentMainThreadExecutor mainThreadExecutor) {
+		lazyInitialize(scheduler::handleGlobalFailure, mainThreadExecutor);
 	}
 
 	@VisibleForTesting
-	void lazyInitialize(Consumer<Throwable> globalFailureHandler, Executor schedulerExecutor) {
-		context.lazyInitialize(globalFailureHandler, schedulerExecutor);
+	void lazyInitialize(Consumer<Throwable> globalFailureHandler, ComponentMainThreadExecutor mainThreadExecutor) {
+		this.globalFailureHandler = globalFailureHandler;
+		this.mainThreadExecutor = mainThreadExecutor;
+		context.lazyInitialize(globalFailureHandler, mainThreadExecutor);
 	}
 
 	// ------------------------------------------------------------------------
@@ -127,6 +126,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 
 	@Override
 	public void start() throws Exception {
+		mainThreadExecutor.assertRunningInMainThread();
 		checkState(context.isInitialized(), "Coordinator Context is not yet initialized");
 		coordinator.start();
 	}
@@ -139,78 +139,112 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 
 	@Override
 	public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
+		mainThreadExecutor.assertRunningInMainThread();
 		coordinator.handleEventFromOperator(subtask, event);
 	}
 
 	@Override
 	public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+		mainThreadExecutor.assertRunningInMainThread();
 		coordinator.subtaskFailed(subtask, reason);
 		eventValve.resetForTask(subtask);
 	}
 
 	@Override
-	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
-		setCurrentlyTriggeredCheckpoint(checkpointId);
-
-		final CompletableFuture<byte[]> checkpointFuture = coordinator.checkpointCoordinator(checkpointId);
-
-		// synchronously!!!, with the completion, we need to shut the event valve
-		checkpointFuture.whenComplete((ignored, failure) -> {
-			if (failure != null) {
-				abortCurrentTriggering();
-			} else {
-				onCheckpointStateFutureComplete(checkpointId);
-			}
-		});
-
-		return checkpointFuture;
+	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
+		// unfortunately, this method does not run in the scheduler executor, but in the
+		// checkpoint coordinator time thread.
+		// we can remove the delegation once the checkpoint coordinator runs fully in the scheduler's
+		// main thread executor
+		final CompletableFuture<byte[]> future = new CompletableFuture<>();
+		mainThreadExecutor.execute(() -> checkpointCoordinatorInternal(checkpointId, future));
+		return future;
 	}
 
 	@Override
 	public void checkpointComplete(long checkpointId) {
-		coordinator.checkpointComplete(checkpointId);
+		// unfortunately, this method does not run in the scheduler executor, but in the
+		// checkpoint coordinator time thread.
+		// we can remove the delegation once the checkpoint coordinator runs fully in the scheduler's
+		// main thread executor
+		mainThreadExecutor.execute(() -> checkpointCompleteInternal(checkpointId));
 	}
 
 	@Override
 	public void resetToCheckpoint(byte[] checkpointData) throws Exception {
-		resetCheckpointTriggeringCheck();
+		// ideally we would like to check this here, however this method is called early during
+		// execution graph construction, before the main thread executor is set
+
 		eventValve.reset();
 		coordinator.resetToCheckpoint(checkpointData);
 	}
 
+	private void checkpointCompleteInternal(long checkpointId) {
+		mainThreadExecutor.assertRunningInMainThread();
+		coordinator.checkpointComplete(checkpointId);
+	}
+
+	private void checkpointCoordinatorInternal(final long checkpointId, final CompletableFuture<byte[]> result) {
+		mainThreadExecutor.assertRunningInMainThread();
+
+		final CompletableFuture<byte[]> checkpointFuture;
+		try {
+			eventValve.markForCheckpoint(checkpointId);
+			checkpointFuture = coordinator.checkpointCoordinator(checkpointId);
+		} catch (Throwable t) {
+			ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+			result.completeExceptionally(t);
+			globalFailureHandler.accept(t);
+			return;
+		}
+
+		// synchronously!!!, with the completion, we need to shut the event valve
+		checkpointFuture.whenComplete((success, failure) -> {
+			if (failure != null) {
+				result.completeExceptionally(failure);
+			} else {
+				try {
+					eventValve.shutValve(checkpointId);
+					result.complete(success);
+				} catch (Exception e) {
+					result.completeExceptionally(e);
+				}
+			}
+		});
+	}
+
 	// ------------------------------------------------------------------------
 	//  Checkpointing Callbacks
 	// ------------------------------------------------------------------------
 
 	@Override
 	public void afterSourceBarrierInjection(long checkpointId) {
-		verifyNoOtherCheckpointBeingTriggered(checkpointId);
-		eventValve.openValve();
-		resetCheckpointTriggeringCheck();
-	}
+		// this method is commonly called by the CheckpointCoordinator's executor thread (timer thread).
 
-	@Override
-	public void abortCurrentTriggering() {
-		eventValve.openValve();
-		resetCheckpointTriggeringCheck();
-	}
+		// we ideally want the scheduler main-thread to be the one that sends the blocked events
+		// however, we need to react synchronously here, to maintain consistency and not allow
+		// another checkpoint injection in-between (unlikely, but possible).
+		// fortunately, the event-sending goes pretty much directly to the RPC gateways, which are
+		// thread safe.
 
-	void onCheckpointStateFutureComplete(long checkpointId) {
-		verifyNoOtherCheckpointBeingTriggered(checkpointId);
-		eventValve.shutValve();
+		// this will automatically be fixed once the checkpoint coordinator runs in the
+		// scheduler's main thread executor
+		eventValve.openValveAndUnmarkCheckpoint();
 	}
 
-	private void verifyNoOtherCheckpointBeingTriggered(long checkpointId) {
-		checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || currentlyTriggeredCheckpoint == checkpointId);
-	}
+	@Override
+	public void abortCurrentTriggering() {
+		// this method is commonly called by the CheckpointCoordinator's executor thread (timer thread).
 
-	private void setCurrentlyTriggeredCheckpoint(long checkpointId) {
-		verifyNoOtherCheckpointBeingTriggered(checkpointId);
-		currentlyTriggeredCheckpoint = checkpointId;
-	}
+		// we ideally want the scheduler main-thread to be the one that sends the blocked events
+		// however, we need to react synchronously here, to maintain consistency and not allow
+		// another checkpoint injection in-between (unlikely, but possible).
+		// fortunately, the event-sending goes pretty much directly to the RPC gateways, which are
+		// thread safe.
 
-	private void resetCheckpointTriggeringCheck() {
-		currentlyTriggeredCheckpoint = NO_CHECKPOINT;
+		// this will automatically be fixed once the checkpoint coordinator runs in the
+		// scheduler's main thread executor
+		eventValve.openValveAndUnmarkCheckpoint();
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
index bc1bfcf..f63df4a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
@@ -41,7 +42,9 @@ import java.util.function.BiFunction;
  *
  * <p>This class is fully thread safe, under the assumption that the event sender is thread-safe.
  */
-public final class OperatorEventValve {
+final class OperatorEventValve {
+
+	private static final long NO_CHECKPOINT = Long.MIN_VALUE;
 
 	private final Object lock = new Object();
 
@@ -52,6 +55,12 @@ public final class OperatorEventValve {
 	private final Map<Integer, List<BlockedEvent>> blockedEvents = new LinkedHashMap<>();
 
 	@GuardedBy("lock")
+	private long currentCheckpointId;
+
+	@GuardedBy("lock")
+	private long lastCheckpointId;
+
+	@GuardedBy("lock")
 	private boolean shut;
 
 	/**
@@ -60,6 +69,8 @@ public final class OperatorEventValve {
 	 */
 	public OperatorEventValve(BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender) {
 		this.eventSender = eventSender;
+		this.currentCheckpointId = NO_CHECKPOINT;
+		this.lastCheckpointId = Long.MIN_VALUE;
 	}
 
 	// ------------------------------------------------------------------------
@@ -96,21 +107,52 @@ public final class OperatorEventValve {
 	 * Shuts the value. All events sent through this valve are blocked until the valve is re-opened.
 	 * If the valve is already shut, this does nothing.
 	 */
-	public void shutValve() {
-		// synchronized block for visibility
+	public void markForCheckpoint(long checkpointId) {
 		synchronized (lock) {
-			shut = true;
+			if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != checkpointId) {
+				throw new IllegalStateException(String.format(
+						"Cannot mark for checkpoint %d, already marked for checkpoint %d",
+						checkpointId, currentCheckpointId));
+			}
+			if (checkpointId > lastCheckpointId) {
+				currentCheckpointId = checkpointId;
+				lastCheckpointId = checkpointId;
+			} else {
+				throw new IllegalStateException(String.format(
+						"Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d",
+						lastCheckpointId, checkpointId));
+			}
+		}
+	}
+
+	/**
+	 * Shuts the value. All events sent through this valve are blocked until the valve is re-opened.
+	 * If the valve is already shut, this does nothing.
+	 */
+	public void shutValve(long checkpointId) {
+		synchronized (lock) {
+			if (checkpointId == currentCheckpointId) {
+				shut = true;
+			} else {
+				throw new IllegalStateException(String.format(
+					"Cannot shut valve for non-prepared checkpoint. " +
+					"Prepared checkpoint = %s, attempting-to-close checkpoint = %d",
+					(currentCheckpointId == NO_CHECKPOINT ? "(none)" : String.valueOf(currentCheckpointId)),
+					checkpointId));
+			}
 		}
 	}
 
 	/**
 	 * Opens the value, releasing all buffered events.
 	 */
-	public void openValve() {
+	public void openValveAndUnmarkCheckpoint() {
 		final ArrayList<FuturePair> futures;
 
 		// send all events under lock, so that no new event can sneak between
 		synchronized (lock) {
+			currentCheckpointId = NO_CHECKPOINT;
+
 			if (!shut) {
 				return;
 			}
@@ -129,14 +171,7 @@ public final class OperatorEventValve {
 
 		// apply the logic on the future outside the lock, to be safe
 		for (FuturePair pair : futures) {
-			final CompletableFuture<Acknowledge> originalFuture = pair.originalFuture;
-			pair.ackFuture.whenComplete((success, failure) -> {
-				if (failure != null) {
-					originalFuture.completeExceptionally(failure);
-				} else {
-					originalFuture.complete(success);
-				}
-			});
+			FutureUtils.forward(pair.ackFuture, pair.originalFuture);
 		}
 	}
 
@@ -165,6 +200,7 @@ public final class OperatorEventValve {
 			}
 			blockedEvents.clear();
 			shut = false;
+			currentCheckpointId = NO_CHECKPOINT;
 		}
 
 		failAllFutures(events);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index ce1d8a1..1a10d6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -970,7 +970,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 		}
 	}
 
-	private void initializeOperatorCoordinators(Executor mainThreadExecutor) {
+	private void initializeOperatorCoordinators(ComponentMainThreadExecutor mainThreadExecutor) {
 		for (OperatorCoordinatorHolder coordinatorHolder : getAllCoordinators()) {
 			coordinatorHolder.lazyInitialize(this, mainThreadExecutor);
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
new file mode 100644
index 0000000..fa4e7cf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -0,0 +1,556 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.TestEventSender.EventWithSubtask;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * A test that ensures the before/after conditions around event sending and checkpoint are met.
+ * concurrency
+ */
+@SuppressWarnings("serial")
+public class OperatorCoordinatorHolderTest extends TestLogger {
+
+	private final Consumer<Throwable> globalFailureHandler = (t) -> globalFailure = t;
+	private Throwable globalFailure;
+
+	@After
+	public void checkNoGlobalFailure() throws Exception {
+		if (globalFailure != null) {
+			ExceptionUtils.rethrowException(globalFailure);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void checkpointFutureInitiallyNotDone() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(1L);
+		assertFalse(checkpointFuture.isDone());
+	}
+
+	@Test
+	public void completedCheckpointFuture() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		final byte[] testData = new byte[] {11, 22, 33, 44};
+
+		final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(9L);
+		getCoordinator(holder).getLastTriggeredCheckpoint().complete(testData);
+
+		assertTrue(checkpointFuture.isDone());
+		assertArrayEquals(testData, checkpointFuture.get());
+	}
+
+	@Test
+	public void eventsBeforeCheckpointFutureCompletionPassThrough() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		holder.checkpointCoordinator(1L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 1);
+
+		assertThat(sender.events, contains(
+			new EventWithSubtask(new TestOperatorEvent(1), 1)
+		));
+	}
+
+	@Test
+	public void eventsAreBlockedAfterCheckpointFutureCompletes() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		triggerAndCompleteCheckpoint(holder, 10L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1337), 0);
+
+		assertTrue(sender.events.isEmpty());
+	}
+
+	@Test
+	public void abortedCheckpointReleasesBlockedEvents() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		triggerAndCompleteCheckpoint(holder, 123L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1337), 0);
+		holder.abortCurrentTriggering();
+
+		assertThat(sender.events, contains(
+			new EventWithSubtask(new TestOperatorEvent(1337), 0)
+		));
+	}
+
+	@Test
+	public void sourceBarrierInjectionReleasesBlockedEvents() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		triggerAndCompleteCheckpoint(holder, 1111L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1337), 0);
+		holder.afterSourceBarrierInjection(1111L);
+
+		assertThat(sender.events, contains(
+			new EventWithSubtask(new TestOperatorEvent(1337), 0)
+		));
+	}
+
+	@Test
+	public void failedTasksDropsBlockedEvents() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		triggerAndCompleteCheckpoint(holder, 1000L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 1);
+		holder.subtaskFailed(1, null);
+		holder.abortCurrentTriggering();
+
+		assertThat(sender.events, contains(
+			new EventWithSubtask(new TestOperatorEvent(0), 0)
+		));
+	}
+
+	@Test
+	public void restoreOpensValveEvents() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		triggerAndCompleteCheckpoint(holder, 1000L);
+		holder.resetToCheckpoint(new byte[0]);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(999), 1);
+
+		assertThat(sender.events, contains(
+			new EventWithSubtask(new TestOperatorEvent(999), 1)
+		));
+	}
+
+	@Test
+	public void restoreDropsBlockedEvents() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		triggerAndCompleteCheckpoint(holder, 1000L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 1);
+		holder.resetToCheckpoint(new byte[0]);
+
+		assertTrue(sender.events.isEmpty());
+	}
+
+	@Test
+	public void lateCompleteCheckpointFutureDoesNotBlockEvents() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		final CompletableFuture<byte[]> holderFuture = holder.checkpointCoordinator(1000L);
+		final CompletableFuture<byte[]> future1 = getCoordinator(holder).getLastTriggeredCheckpoint();
+		holder.abortCurrentTriggering();
+
+		triggerAndCompleteCheckpoint(holder, 1010L);
+		holder.afterSourceBarrierInjection(1010L);
+
+		future1.complete(new byte[0]);
+
+		assertTrue(holderFuture.isCompletedExceptionally());
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(123), 0);
+
+		assertThat(sender.events, contains(
+			new EventWithSubtask(new TestOperatorEvent(123), 0)
+		));
+	}
+
+	@Test
+	public void triggeringFailsIfOtherTriggeringInProgress() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		holder.checkpointCoordinator(11L);
+		final CompletableFuture<?> future = holder.checkpointCoordinator(12L);
+
+		assertTrue(future.isCompletedExceptionally());
+		assertNotNull(globalFailure);
+		globalFailure = null;
+	}
+
+	@Test
+	public void takeCheckpointAfterSuccessfulCheckpoint() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0);
+
+		triggerAndCompleteCheckpoint(holder, 22L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 0);
+		holder.afterSourceBarrierInjection(22L);
+
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(2), 0);
+
+		triggerAndCompleteCheckpoint(holder, 23L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(3), 0);
+		holder.afterSourceBarrierInjection(23L);
+
+		assertThat(sender.events, contains(
+			new EventWithSubtask(new TestOperatorEvent(0), 0),
+			new EventWithSubtask(new TestOperatorEvent(1), 0),
+			new EventWithSubtask(new TestOperatorEvent(2), 0),
+			new EventWithSubtask(new TestOperatorEvent(3), 0)
+		));
+	}
+
+	@Test
+	public void takeCheckpointAfterAbortedCheckpoint() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0);
+
+		triggerAndCompleteCheckpoint(holder, 22L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 0);
+		holder.abortCurrentTriggering();
+
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(2), 0);
+
+		triggerAndCompleteCheckpoint(holder, 23L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(3), 0);
+		holder.afterSourceBarrierInjection(23L);
+
+		assertThat(sender.events, contains(
+			new EventWithSubtask(new TestOperatorEvent(0), 0),
+			new EventWithSubtask(new TestOperatorEvent(1), 0),
+			new EventWithSubtask(new TestOperatorEvent(2), 0),
+			new EventWithSubtask(new TestOperatorEvent(3), 0)
+		));
+	}
+
+	/**
+	 * This test verifies that the order of Checkpoint Completion and Event Sending observed from the
+	 * outside matches that from within the OperatorCoordinator.
+	 *
+	 * <p>Extreme case 1: The coordinator immediately completes the checkpoint future and sends an
+	 * event directly after that.
+	 */
+	@Test
+	public void verifyCheckpointEventOrderWhenCheckpointFutureCompletedImmediately() throws Exception {
+		checkpointEventValueAtomicity(FutureCompletedInstantlyTestCoordinator::new);
+	}
+
+	/**
+	 * This test verifies that the order of Checkpoint Completion and Event Sending observed from the
+	 * outside matches that from within the OperatorCoordinator.
+	 *
+	 * <p>Extreme case 2: After the checkpoint triggering, the coordinator flushes a bunch of events
+	 * before completing the checkpoint future.
+	 */
+	@Test
+	public void verifyCheckpointEventOrderWhenCheckpointFutureCompletesLate() throws Exception {
+		checkpointEventValueAtomicity(FutureCompletedAfterSendingEventsCoordinator::new);
+	}
+
+	private void checkpointEventValueAtomicity(
+			final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor) throws Exception {
+
+		final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
+		final ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(
+				(ScheduledExecutorService) executor, Thread.currentThread());
+
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(
+				sender, coordinatorCtor, mainThreadExecutor);
+
+		// give the coordinator some time to emit some events
+		Thread.sleep(new Random().nextInt(10) + 20);
+		executor.triggerAll();
+
+		// trigger the checkpoint - this should also shut the valve as soon as the future is completed
+		final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(0L);
+		executor.triggerAll();
+
+		// give the coordinator some time to emit some events
+		Thread.sleep(new Random().nextInt(10) + 10);
+		holder.close();
+		executor.triggerAll();
+
+		assertTrue(checkpointFuture.isDone());
+		final int checkpointedNumber = bytesToInt(checkpointFuture.get());
+
+		assertEquals(checkpointedNumber, sender.events.size());
+		for (int i = 0; i < checkpointedNumber; i++) {
+			assertEquals(i, ((TestOperatorEvent) sender.events.get(i).event).getValue());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//   test actions
+	// ------------------------------------------------------------------------
+
+	private CompletableFuture<byte[]> triggerAndCompleteCheckpoint(
+			OperatorCoordinatorHolder holder,
+			long checkpointId) throws Exception {
+
+		final CompletableFuture<byte[]> future = holder.checkpointCoordinator(checkpointId);
+		getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]);
+		return future;
+	}
+
+	// ------------------------------------------------------------------------
+	//   miscellaneous helpers
+	// ------------------------------------------------------------------------
+
+	static byte[] intToBytes(int value) {
+		return ByteBuffer.allocate(4).putInt(value).array();
+	}
+
+	static int bytesToInt(byte[] bytes) {
+		return ByteBuffer.wrap(bytes).getInt();
+	}
+
+	private static TestingOperatorCoordinator getCoordinator(OperatorCoordinatorHolder holder) {
+		return (TestingOperatorCoordinator) holder.coordinator();
+	}
+
+	private OperatorCoordinatorHolder createCoordinatorHolder(
+			final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender,
+			final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor) throws Exception {
+
+		return createCoordinatorHolder(
+				eventSender,
+				coordinatorCtor,
+				ComponentMainThreadExecutorServiceAdapter.forMainThread());
+	}
+
+	private OperatorCoordinatorHolder createCoordinatorHolder(
+			final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender,
+			final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor,
+			final ComponentMainThreadExecutor mainThreadExecutor) throws Exception {
+
+		final OperatorID opId = new OperatorID();
+		final OperatorCoordinator.Provider provider = new OperatorCoordinator.Provider() {
+			@Override
+			public OperatorID getOperatorId() {
+				return opId;
+			}
+
+			@Override
+			public OperatorCoordinator create(OperatorCoordinator.Context context) {
+				return coordinatorCtor.apply(context);
+			}
+		};
+
+		final OperatorCoordinatorHolder holder = OperatorCoordinatorHolder.create(
+				opId,
+				provider,
+				eventSender,
+				"test-coordinator-name",
+				3,
+				1775);
+
+		holder.lazyInitialize(globalFailureHandler, mainThreadExecutor);
+		holder.start();
+
+		return holder;
+	}
+
+	// ------------------------------------------------------------------------
+	//   test implementations
+	// ------------------------------------------------------------------------
+
+	private static final class FutureCompletedInstantlyTestCoordinator extends CheckpointEventOrderTestBaseCoordinator {
+
+		private final ReentrantLock lock = new ReentrantLock(true);
+		private final Condition condition = lock.newCondition();
+
+		@Nullable
+		@GuardedBy("lock")
+		private CompletableFuture<byte[]> checkpoint;
+
+		private int num;
+
+		FutureCompletedInstantlyTestCoordinator(Context context) {
+			super(context);
+		}
+
+		@Override
+		public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+			// we create the checkpoint future, but before returning it, we wait on a
+			// condition. that way, we simulate a "context switch" just at the time when the
+			// future would be returned and make the other thread complete the future and send an
+			// event before this method returns
+			final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+			lock.lock();
+			try {
+				checkpoint = checkpointFuture;
+				condition.await();
+				return checkpointFuture;
+			} finally {
+				lock.unlock();
+			}
+		}
+
+		@Override
+		protected void step() throws Exception {
+			lock.lock();
+			try {
+				// if there is a checkpoint to complete, we complete it and immediately
+				// try to send another event, without releasing the lock. that way we
+				// force the situation as if the checkpoint get completed and an event gets
+				// sent while the triggering thread is stalled
+				if (checkpoint != null) {
+					checkpoint.complete(intToBytes(num));
+					checkpoint = null;
+				}
+				context.sendEvent(new TestOperatorEvent(num++), 0);
+				condition.signalAll();
+			} finally {
+				lock.unlock();
+			}
+
+			Thread.sleep(2);
+		}
+	}
+
+	private static final class FutureCompletedAfterSendingEventsCoordinator extends CheckpointEventOrderTestBaseCoordinator {
+
+		@Nullable
+		private CompletableFuture<byte[]> checkpoint;
+
+		private int num;
+
+		FutureCompletedAfterSendingEventsCoordinator(Context context) {
+			super(context);
+		}
+
+		@Override
+		public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+			final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+			checkpoint = checkpointFuture;
+			return checkpointFuture;
+		}
+
+		@Override
+		protected void step() throws Exception {
+			Thread.sleep(2);
+
+			context.sendEvent(new TestOperatorEvent(num++), 0);
+			context.sendEvent(new TestOperatorEvent(num++), 1);
+			context.sendEvent(new TestOperatorEvent(num++), 2);
+
+			if (checkpoint != null) {
+				checkpoint.complete(intToBytes(num));
+				checkpoint = null;
+			}
+		}
+	}
+
+	private abstract static class CheckpointEventOrderTestBaseCoordinator implements OperatorCoordinator, Runnable {
+
+		private final Thread coordinatorThread;
+
+		protected final Context context;
+
+		private volatile boolean closed;
+
+		CheckpointEventOrderTestBaseCoordinator(Context context) {
+			this.context = context;
+			this.coordinatorThread = new Thread(this);
+		}
+
+		@Override
+		public void start() throws Exception {
+			coordinatorThread.start();
+		}
+
+		@Override
+		public void close() throws Exception {
+			closed = true;
+			coordinatorThread.interrupt();
+			coordinatorThread.join();
+		}
+
+		@Override
+		public void handleEventFromOperator(int subtask, OperatorEvent event){}
+
+		@Override
+		public void subtaskFailed(int subtask, @Nullable Throwable reason) {}
+
+		@Override
+		public abstract CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception;
+
+		@Override
+		public void checkpointComplete(long checkpointId) {}
+
+		@Override
+		public void resetToCheckpoint(byte[] checkpointData) throws Exception {}
+
+		@Override
+		public void run() {
+			try {
+				while (!closed) {
+					step();
+				}
+			} catch (Throwable t) {
+				if (closed) {
+					return;
+				}
+
+				// this should never happen, but just in case, print and crash the test
+				//noinspection CallToPrintStackTrace
+				t.printStackTrace();
+				System.exit(-1);
+			}
+		}
+
+		protected abstract void step() throws Exception;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
index 9b0325d..a380958 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
@@ -50,11 +50,30 @@ public class OperatorEventValveTest {
 		assertTrue(future.isDone());
 	}
 
+	@Test(expected = IllegalStateException.class)
+	public void errorShuttingUnmarkedValve() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+
+		valve.shutValve(123L);
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void errorShuttingValveForOtherMark() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+
+		valve.markForCheckpoint(100L);
+		valve.shutValve(123L);
+	}
+
 	@Test
 	public void eventsBlockedByClosedValve() throws Exception {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorEventValve valve = new OperatorEventValve(sender);
-		valve.shutValve();
+
+		valve.markForCheckpoint(1L);
+		valve.shutValve(1L);
 
 		final CompletableFuture<Acknowledge> future =
 				valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 1);
@@ -67,14 +86,16 @@ public class OperatorEventValveTest {
 	public void eventsReleasedAfterOpeningValve() throws Exception {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorEventValve valve = new OperatorEventValve(sender);
-		valve.shutValve();
+
+		valve.markForCheckpoint(17L);
+		valve.shutValve(17L);
 
 		final OperatorEvent event1 = new TestOperatorEvent();
 		final OperatorEvent event2 = new TestOperatorEvent();
 		final CompletableFuture<Acknowledge> future1 = valve.sendEvent(new SerializedValue<>(event1), 3);
 		final CompletableFuture<Acknowledge> future2 = valve.sendEvent(new SerializedValue<>(event2), 0);
 
-		valve.openValve();
+		valve.openValveAndUnmarkCheckpoint();
 
 		assertThat(sender.events, containsInAnyOrder(
 			new EventWithSubtask(event1, 3),
@@ -88,11 +109,13 @@ public class OperatorEventValveTest {
 	public void releasedEventsForwardSendFailures() throws Exception {
 		final TestEventSender sender = new TestEventSender(new FlinkException("test"));
 		final OperatorEventValve valve = new OperatorEventValve(sender);
-		valve.shutValve();
+
+		valve.markForCheckpoint(17L);
+		valve.shutValve(17L);
 
 		final CompletableFuture<Acknowledge> future =
 				valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 10);
-		valve.openValve();
+		valve.openValveAndUnmarkCheckpoint();
 
 		assertTrue(future.isCompletedExceptionally());
 	}
@@ -101,13 +124,14 @@ public class OperatorEventValveTest {
 	public void resetDropsAllEvents() throws Exception {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorEventValve valve = new OperatorEventValve(sender);
-		valve.shutValve();
+		valve.markForCheckpoint(17L);
+		valve.shutValve(17L);
 
 		valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 0);
 		valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 1);
 
 		valve.reset();
-		valve.openValve();
+		valve.openValveAndUnmarkCheckpoint();
 
 		assertTrue(sender.events.isEmpty());
 	}
@@ -116,7 +140,8 @@ public class OperatorEventValveTest {
 	public void resetForTaskDropsSelectiveEvents() throws Exception {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorEventValve valve = new OperatorEventValve(sender);
-		valve.shutValve();
+		valve.markForCheckpoint(17L);
+		valve.shutValve(17L);
 
 		final OperatorEvent event1 = new TestOperatorEvent();
 		final OperatorEvent event2 = new TestOperatorEvent();
@@ -124,7 +149,7 @@ public class OperatorEventValveTest {
 		final CompletableFuture<Acknowledge> future2 = valve.sendEvent(new SerializedValue<>(event2), 1);
 
 		valve.resetForTask(1);
-		valve.openValve();
+		valve.openValveAndUnmarkCheckpoint();
 
 		assertThat(sender.events, contains(new EventWithSubtask(event1, 0)));
 		assertTrue(future1.isDone());
@@ -136,7 +161,8 @@ public class OperatorEventValveTest {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorEventValve valve = new OperatorEventValve(sender);
 
-		valve.shutValve();
+		valve.markForCheckpoint(17L);
+		valve.shutValve(17L);
 		valve.reset();
 
 		assertFalse(valve.isShut());


[flink] 04/11: [FLINK-16986][coordination] (part 1) Provide exactly-once guarantees around checkpoints and operator event sending.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a721d855998a7c91415312d4890d7d4f916e163
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon May 18 01:46:32 2020 +0200

    [FLINK-16986][coordination] (part 1) Provide exactly-once guarantees around checkpoints and operator event sending.
    
    This closes #12234
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |   8 +-
 .../OperatorCoordinatorCheckpointContext.java      |  50 +-
 .../checkpoint/OperatorCoordinatorCheckpoints.java |  12 +-
 .../runtime/executiongraph/ExecutionGraph.java     |   9 +-
 .../coordination/OperatorCoordinatorHolder.java    | 109 ++++-
 .../operators/coordination/OperatorEventValve.java | 209 +++++++++
 .../flink/runtime/scheduler/SchedulerBase.java     |   4 +-
 .../runtime/checkpoint/PendingCheckpointTest.java  |   8 +-
 .../CoordinatorEventsExactlyOnceITCase.java        | 501 +++++++++++++++++++++
 .../MockOperatorCoordinatorCheckpointContext.java} |  60 +--
 .../OperatorCoordinatorSchedulerTest.java          |   4 +-
 .../coordination/OperatorEventValveTest.java       | 144 ++++++
 .../operators/coordination/TestEventSender.java    | 113 +++++
 .../operators/coordination/TestOperatorEvent.java  |  33 ++
 14 files changed, 1175 insertions(+), 89 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index b283a2a..51b98f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -545,13 +545,17 @@ public class CheckpointCoordinator {
 
 						if (throwable == null && checkpoint != null && !checkpoint.isDiscarded()) {
 							// no exception, no discarding, everything is OK
+							final long checkpointId = checkpoint.getCheckpointId();
 							snapshotTaskState(
 								timestamp,
-								checkpoint.getCheckpointId(),
+								checkpointId,
 								checkpoint.getCheckpointStorageLocation(),
 								request.props,
 								executions,
 								request.advanceToEndOfTime);
+
+							coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
+
 							onTriggerSuccess();
 						} else {
 								// the initialization might not be finished yet
@@ -777,6 +781,8 @@ public class CheckpointCoordinator {
 		throwable = ExceptionUtils.stripCompletionException(throwable);
 
 		try {
+			coordinatorsToCheckpoint.forEach(OperatorCoordinatorCheckpointContext::abortCurrentTriggering);
+
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
 				int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
 				LOG.warn(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
index 16acbb2..92fd2aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
@@ -24,51 +24,41 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import java.util.Collection;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * An {@link OperatorCoordinator} and its contextual information needed to trigger and
  * acknowledge a checkpoint.
  */
-public final class OperatorCoordinatorCheckpointContext {
+public interface OperatorCoordinatorCheckpointContext {
 
-	private final OperatorCoordinator coordinator;
+	// ------------------------------------------------------------------------
+	//  properties
+	// ------------------------------------------------------------------------
 
-	private final OperatorID operatorId;
+	OperatorCoordinator coordinator();
 
-	private final int maxParallelism;
+	OperatorID operatorId();
 
-	private final int currentParallelism;
+	int maxParallelism();
 
-	public OperatorCoordinatorCheckpointContext(
-			OperatorCoordinator coordinator,
-			OperatorID operatorId,
-			int maxParallelism,
-			int currentParallelism) {
+	int currentParallelism();
 
-		this.coordinator = checkNotNull(coordinator);
-		this.operatorId = checkNotNull(operatorId);
-		this.maxParallelism = maxParallelism;
-		this.currentParallelism = currentParallelism;
-	}
+	// ------------------------------------------------------------------------
+	//  checkpoint triggering callbacks
+	// ------------------------------------------------------------------------
 
-	public OperatorCoordinator coordinator() {
-		return coordinator;
-	}
+	void onCallTriggerCheckpoint(long checkpointId);
 
-	public OperatorID operatorId() {
-		return operatorId;
-	}
+	void onCheckpointStateFutureComplete(long checkpointId);
 
-	public int maxParallelism() {
-		return maxParallelism;
-	}
+	void afterSourceBarrierInjection(long checkpointId);
 
-	public int currentParallelism() {
-		return currentParallelism;
-	}
+	void abortCurrentTriggering();
+
+	// ------------------------------------------------------------------------
+	//  utils
+	// ------------------------------------------------------------------------
 
-	public static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
+	static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
 		return infos.stream()
 			.map(OperatorCoordinatorCheckpointContext::operatorId)
 			.collect(Collectors.toList());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
index 39ac10f..68fc8f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
@@ -58,7 +58,17 @@ final class OperatorCoordinatorCheckpoints {
 		final Collection<CompletableFuture<CoordinatorSnapshot>> individualSnapshots = new ArrayList<>(coordinators.size());
 
 		for (final OperatorCoordinatorCheckpointContext coordinator : coordinators) {
-			individualSnapshots.add(triggerCoordinatorCheckpoint(coordinator, checkpointId));
+			final CompletableFuture<CoordinatorSnapshot> checkpointFuture = triggerCoordinatorCheckpoint(coordinator, checkpointId);
+			coordinator.onCallTriggerCheckpoint(checkpointId);
+
+			individualSnapshots.add(checkpointFuture);
+			checkpointFuture.whenComplete((ignored, failure) -> {
+				if (failure != null) {
+					coordinator.abortCurrentTriggering();
+				} else {
+					coordinator.onCheckpointStateFutureComplete(checkpointId);
+				}
+			});
 		}
 
 		return FutureUtils.combineAll(individualSnapshots).thenApply(AllCoordinatorSnapshots::new);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index b84a86f..dfd20ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -64,7 +64,6 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.scheduler.InternalFailuresListener;
 import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology;
@@ -569,13 +568,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
 	private Collection<OperatorCoordinatorCheckpointContext> buildOpCoordinatorCheckpointContexts() {
 		final ArrayList<OperatorCoordinatorCheckpointContext> contexts = new ArrayList<>();
 		for (final ExecutionJobVertex vertex : verticesInCreationOrder) {
-			for (final OperatorCoordinatorHolder coordinator : vertex.getOperatorCoordinators()) {
-				contexts.add(new OperatorCoordinatorCheckpointContext(
-						coordinator,
-						coordinator.getOperatorId(),
-						vertex.getMaxParallelism(),
-						vertex.getParallelism()));
-			}
+			contexts.addAll(vertex.getOperatorCoordinators());
 		}
 		contexts.trimToSize();
 		return contexts;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 43c47fe..c70e229 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -33,6 +34,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -51,34 +53,99 @@ import static org.apache.flink.util.Preconditions.checkState;
  * However, the real Coordinators can only be created after SchedulerNG was created, because they need
  * a reference to it for the failure calls.
  */
-public class OperatorCoordinatorHolder implements OperatorCoordinator {
+public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorCoordinatorCheckpointContext {
+
+	private static final long NO_CHECKPOINT = Long.MIN_VALUE;
 
 	private final OperatorCoordinator coordinator;
 	private final OperatorID operatorId;
 	private final LazyInitializedCoordinatorContext context;
 
+	private final OperatorEventValve eventValve;
+
+	// these two fields are needed for the construction of OperatorStateHandles when taking checkpoints
+	private final int operatorParallelism;
+	private final int operatorMaxParallelism;
+
+	private long currentlyTriggeredCheckpoint;
+
 	private OperatorCoordinatorHolder(
 			final OperatorID operatorId,
 			final OperatorCoordinator coordinator,
-			final LazyInitializedCoordinatorContext context) {
+			final LazyInitializedCoordinatorContext context,
+			final OperatorEventValve eventValve,
+			final int operatorParallelism,
+			final int operatorMaxParallelism) {
 
 		this.operatorId = checkNotNull(operatorId);
 		this.coordinator = checkNotNull(coordinator);
 		this.context = checkNotNull(context);
+		this.eventValve = checkNotNull(eventValve);
+		this.operatorParallelism = operatorParallelism;
+		this.operatorMaxParallelism = operatorMaxParallelism;
+
+		this.currentlyTriggeredCheckpoint = NO_CHECKPOINT;
 	}
 
+	public void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
+		context.lazyInitialize(scheduler, schedulerExecutor);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties
 	// ------------------------------------------------------------------------
 
-	public OperatorID getOperatorId() {
+	@Override
+	public OperatorID operatorId() {
 		return operatorId;
 	}
 
-	public OperatorCoordinator getCoordinator() {
+	@Override
+	public OperatorCoordinator coordinator() {
 		return coordinator;
 	}
 
-	public void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
-		context.lazyInitialize(scheduler, schedulerExecutor);
+	@Override
+	public int maxParallelism() {
+		return operatorMaxParallelism;
+	}
+
+	@Override
+	public int currentParallelism() {
+		return operatorParallelism;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Checkpointing Callbacks
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void onCallTriggerCheckpoint(long checkpointId) {
+		checkCheckpointAlreadyHappening(checkpointId);
+		currentlyTriggeredCheckpoint = checkpointId;
+	}
+
+	@Override
+	public void onCheckpointStateFutureComplete(long checkpointId) {
+		checkCheckpointAlreadyHappening(checkpointId);
+		eventValve.shutValve();
+	}
+
+	@Override
+	public void afterSourceBarrierInjection(long checkpointId) {
+		checkCheckpointAlreadyHappening(checkpointId);
+		eventValve.openValve();
+		currentlyTriggeredCheckpoint = NO_CHECKPOINT;
+	}
+
+	@Override
+	public void abortCurrentTriggering() {
+		eventValve.openValve();
+		currentlyTriggeredCheckpoint = NO_CHECKPOINT;
+	}
+
+	private void checkCheckpointAlreadyHappening(long checkpointId) {
+		checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || currentlyTriggeredCheckpoint == checkpointId);
 	}
 
 	// ------------------------------------------------------------------------
@@ -105,6 +172,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator {
 	@Override
 	public void subtaskFailed(int subtask, @Nullable Throwable reason) {
 		coordinator.subtaskFailed(subtask, reason);
+		eventValve.resetForTask(subtask);
 	}
 
 	@Override
@@ -134,9 +202,24 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator {
 		try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
 			final OperatorCoordinator.Provider provider = serializedProvider.deserializeValue(classLoader);
 			final OperatorID opId = provider.getOperatorId();
-			final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(opId, jobVertex);
+
+			final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender =
+				(serializedEvent, subtask) -> {
+					final Execution executionAttempt = jobVertex.getTaskVertices()[subtask].getCurrentExecutionAttempt();
+					return executionAttempt.sendOperatorEvent(opId, serializedEvent);
+				};
+
+			final OperatorEventValve valve = new OperatorEventValve(eventSender);
+			final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(opId, jobVertex, valve);
 			final OperatorCoordinator coordinator = provider.create(context);
-			return new OperatorCoordinatorHolder(opId, coordinator, context);
+
+			return new OperatorCoordinatorHolder(
+					opId,
+					coordinator,
+					context,
+					valve,
+					jobVertex.getParallelism(),
+					jobVertex.getMaxParallelism());
 		}
 	}
 
@@ -157,13 +240,18 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator {
 
 		private final OperatorID operatorId;
 		private final ExecutionJobVertex jobVertex;
+		private final OperatorEventValve eventValve;
 
 		private SchedulerNG scheduler;
 		private Executor schedulerExecutor;
 
-		public LazyInitializedCoordinatorContext(OperatorID operatorId, ExecutionJobVertex jobVertex) {
+		public LazyInitializedCoordinatorContext(
+				OperatorID operatorId,
+				ExecutionJobVertex jobVertex,
+				OperatorEventValve eventValve) {
 			this.operatorId = checkNotNull(operatorId);
 			this.jobVertex = checkNotNull(jobVertex);
+			this.eventValve = checkNotNull(eventValve);
 		}
 
 		void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
@@ -208,8 +296,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator {
 				throw new FlinkRuntimeException("Cannot serialize operator event", e);
 			}
 
-			final Execution executionAttempt = jobVertex.getTaskVertices()[targetSubtask].getCurrentExecutionAttempt();
-			return executionAttempt.sendOperatorEvent(operatorId, serializedEvent);
+			return eventValve.sendEvent(serializedEvent, targetSubtask);
 		}
 
 		@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
new file mode 100644
index 0000000..bc1bfcf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+
+/**
+ * The event value is the connection through which operator events are sent, from coordinator to
+ * operator.It can temporarily block events from going through, buffering them, and releasing them
+ * later.
+ *
+ * <p>The valve can also drop buffered events for all or selected targets.
+ *
+ * <p>This class is fully thread safe, under the assumption that the event sender is thread-safe.
+ */
+public final class OperatorEventValve {
+
+	private final Object lock = new Object();
+
+	@GuardedBy("lock")
+	private final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender;
+
+	@GuardedBy("lock")
+	private final Map<Integer, List<BlockedEvent>> blockedEvents = new LinkedHashMap<>();
+
+	@GuardedBy("lock")
+	private boolean shut;
+
+	/**
+	 * Constructs a new OperatorEventValve, passing the events to the given function when the valve is open
+	 * or opened again. The second parameter of the BiFunction is the target operator subtask index.
+	 */
+	public OperatorEventValve(BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender) {
+		this.eventSender = eventSender;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public boolean isShut() {
+		// synchronized block for visibility
+		synchronized (lock) {
+			return shut;
+		}
+	}
+
+	/**
+	 * Send the event directly, if the valve is open, and returns the original sending result future.
+	 *
+	 * <p>If the valve is closed this buffers the event and returns an incomplete future. The future is completed
+	 * with the original result once the valve is opened. If the event is never sent (because it gets dropped
+	 * through a call to {@link #reset()} or {@link #resetForTask(int)}, then the returned future till be
+	 * completed exceptionally.
+	 */
+	public CompletableFuture<Acknowledge> sendEvent(SerializedValue<OperatorEvent> event, int subtask) {
+		synchronized (lock) {
+			if (!shut) {
+				return eventSender.apply(event, subtask);
+			}
+
+			final List<BlockedEvent> eventsForTask = blockedEvents.computeIfAbsent(subtask, (key) -> new ArrayList<>());
+			final CompletableFuture<Acknowledge> future = new CompletableFuture<>();
+			eventsForTask.add(new BlockedEvent(event, subtask, future));
+			return future;
+		}
+	}
+
+	/**
+	 * Shuts the value. All events sent through this valve are blocked until the valve is re-opened.
+	 * If the valve is already shut, this does nothing.
+	 */
+	public void shutValve() {
+		// synchronized block for visibility
+		synchronized (lock) {
+			shut = true;
+		}
+	}
+
+	/**
+	 * Opens the value, releasing all buffered events.
+	 */
+	public void openValve() {
+		final ArrayList<FuturePair> futures;
+
+		// send all events under lock, so that no new event can sneak between
+		synchronized (lock) {
+			if (!shut) {
+				return;
+			}
+
+			futures = new ArrayList<>(blockedEvents.size());
+
+			for (List<BlockedEvent> eventsForTask : blockedEvents.values()) {
+				for (BlockedEvent blockedEvent : eventsForTask) {
+					final CompletableFuture<Acknowledge> ackFuture = eventSender.apply(blockedEvent.event, blockedEvent.subtask);
+					futures.add(new FuturePair(blockedEvent.future, ackFuture));
+				}
+			}
+			blockedEvents.clear();
+			shut = false;
+		}
+
+		// apply the logic on the future outside the lock, to be safe
+		for (FuturePair pair : futures) {
+			final CompletableFuture<Acknowledge> originalFuture = pair.originalFuture;
+			pair.ackFuture.whenComplete((success, failure) -> {
+				if (failure != null) {
+					originalFuture.completeExceptionally(failure);
+				} else {
+					originalFuture.complete(success);
+				}
+			});
+		}
+	}
+
+	/**
+	 * Drops all blocked events for a specific subtask.
+	 */
+	public void resetForTask(int subtask) {
+		final List<BlockedEvent> events;
+		synchronized (lock) {
+			events = blockedEvents.remove(subtask);
+		}
+
+		failAllFutures(events);
+	}
+
+	/**
+	 * Resets the valve, dropping all blocked events and opening the valve.
+	 */
+	public void reset() {
+		final List<BlockedEvent> events = new ArrayList<>();
+		synchronized (lock) {
+			for (List<BlockedEvent> taskEvents : blockedEvents.values()) {
+				if (taskEvents != null) {
+					events.addAll(taskEvents);
+				}
+			}
+			blockedEvents.clear();
+			shut = false;
+		}
+
+		failAllFutures(events);
+	}
+
+	private static void failAllFutures(@Nullable List<BlockedEvent> events) {
+		if (events == null || events.isEmpty()) {
+			return;
+		}
+
+		final Exception failureCause = new FlinkException("Event discarded due to failure of target task");
+		for (BlockedEvent evt : events) {
+			evt.future.completeExceptionally(failureCause);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class BlockedEvent {
+
+		final SerializedValue<OperatorEvent> event;
+		final CompletableFuture<Acknowledge> future;
+		final int subtask;
+
+		BlockedEvent(SerializedValue<OperatorEvent> event, int subtask, CompletableFuture<Acknowledge> future) {
+			this.event = event;
+			this.future = future;
+			this.subtask = subtask;
+		}
+	}
+
+	private static final class FuturePair {
+
+		final CompletableFuture<Acknowledge> originalFuture;
+		final CompletableFuture<Acknowledge> ackFuture;
+
+		FuturePair(CompletableFuture<Acknowledge> originalFuture, CompletableFuture<Acknowledge> ackFuture) {
+			this.originalFuture = originalFuture;
+			this.ackFuture = ackFuture;
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 6f2ae47..ce1d8a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -962,7 +962,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 			throw new FlinkException("Coordinator of operator " + operator + " does not exist");
 		}
 
-		final OperatorCoordinator coordinator = coordinatorHolder.getCoordinator();
+		final OperatorCoordinator coordinator = coordinatorHolder.coordinator();
 		if (coordinator instanceof CoordinationRequestHandler) {
 			return ((CoordinationRequestHandler) coordinator).handleCoordinationRequest(request);
 		} else {
@@ -1002,7 +1002,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 		Map<OperatorID, OperatorCoordinatorHolder> coordinatorMap = new HashMap<>();
 		for (ExecutionJobVertex vertex : executionGraph.getAllVertices().values()) {
 			for (OperatorCoordinatorHolder holder : vertex.getOperatorCoordinators()) {
-				coordinatorMap.put(holder.getOperatorId(), holder);
+				coordinatorMap.put(holder.operatorId(), holder);
 			}
 		}
 		return coordinatorMap;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index ee292b0..b454633 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorCheckpointContext;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.TestingStreamStateHandle;
@@ -563,11 +563,7 @@ public class PendingCheckpointTest {
 	}
 
 	private static OperatorCoordinatorCheckpointContext createOperatorCoordinator() {
-		return new OperatorCoordinatorCheckpointContext(
-				new MockOperatorCoordinator(),
-				new OperatorID(),
-				256,
-				50);
+		return new MockOperatorCoordinatorCheckpointContext();
 	}
 
 	@SuppressWarnings("unchecked")
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
new file mode 100644
index 0000000..919e6fe
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration Test case that validates the exactly-once mechanism for coordinator events
+ * around checkpoints.
+ *
+ * <p>The test provokes the corner cases of the mechanism described in {@link OperatorCoordinatorHolder}.
+ * <pre>
+ * Coordinator one events: => a . . b . |trigger| . . |complete| . . c . . d . |barrier| . e . f
+ * Coordinator two events: => . . x . . |trigger| . . . . . . . . . .|complete||barrier| . . y . . z
+ * </pre>
+ *
+ * <p>The test generates two sequences of events form two Operator Coordinators to two operators (tasks).
+ * The event sequences have a different speed in which they are sent. The coordinators have different
+ * delays in which they complete their checkpoints. Both coordinators inject failures at different points.
+ */
+@SuppressWarnings("serial")
+public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
+
+	private static final ConfigOption<String> ACC_NAME = ConfigOptions.key("acc").stringType().noDefaultValue();
+	private static final String OPERATOR_1_ACCUMULATOR = "op-acc-1";
+	private static final String OPERATOR_2_ACCUMULATOR = "op-acc-2";
+
+	private static MiniCluster miniCluster;
+
+	@BeforeClass
+	public static void startMiniCluster() throws Exception {
+		final Configuration config = new Configuration();
+		config.setString(RestOptions.BIND_PORT, "0");
+
+		final MiniClusterConfiguration clusterCfg = new MiniClusterConfiguration.Builder()
+			.setNumTaskManagers(2)
+			.setNumSlotsPerTaskManager(1)
+			.setConfiguration(config)
+			.build();
+
+		miniCluster = new MiniCluster(clusterCfg);
+		miniCluster.start();
+	}
+
+	@AfterClass
+	public static void shutdownMiniCluster() throws Exception {
+		miniCluster.close();
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void test() throws Exception {
+		final int numEvents1 = 200;
+		final int numEvents2 = 5;
+		final int delay1 = 1;
+		final int delay2 = 200;
+
+		final JobVertex task1 = buildJobVertex("TASK_1", numEvents1, delay1, OPERATOR_1_ACCUMULATOR);
+		final JobVertex task2 = buildJobVertex("TASK_2", numEvents2, delay2, OPERATOR_2_ACCUMULATOR);
+		final JobGraph jobGraph = new JobGraph("Coordinator Events Job", task1, task2);
+		jobGraph.setSnapshotSettings(createCheckpointSettings(task1, task2));
+
+		final JobExecutionResult result = miniCluster.executeJobBlocking(jobGraph);
+
+		checkListContainsSequence(result.getAccumulatorResult(OPERATOR_2_ACCUMULATOR), numEvents2);
+		checkListContainsSequence(result.getAccumulatorResult(OPERATOR_1_ACCUMULATOR), numEvents1);
+	}
+
+	private static void checkListContainsSequence(List<Integer> ints, int length) {
+		if (ints.size() != length) {
+			failList(ints, length);
+		}
+
+		int nextExpected = 0;
+		for (int next : ints) {
+			if (next != nextExpected++) {
+				failList(ints, length);
+			}
+		}
+	}
+
+	private static void failList(List<Integer> ints, int length) {
+		fail("List did not contain expected sequence of " + length + " elements, but was: " + ints);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test setup helpers
+	// ------------------------------------------------------------------------
+
+	private static JobVertex buildJobVertex(String name, int numEvents, int delay, String accName) throws IOException {
+		final JobVertex vertex = new JobVertex(name);
+		final OperatorID opId = OperatorID.fromJobVertexID(vertex.getID());
+
+		vertex.setParallelism(1);
+		vertex.setInvokableClass(EventCollectingTask.class);
+		vertex.getConfiguration().setString(ACC_NAME, accName);
+
+		final OperatorCoordinator.Provider provider = new OperatorCoordinator.Provider() {
+
+			@Override
+			public OperatorID getOperatorId() {
+				return opId;
+			}
+
+			@Override
+			public OperatorCoordinator create(OperatorCoordinator.Context context) {
+				return new EventSendingCoordinator(context, numEvents, delay);
+			}
+		};
+
+		vertex.addOperatorCoordinator(new SerializedValue<>(provider));
+
+		return vertex;
+	}
+
+	private static JobCheckpointingSettings createCheckpointSettings(JobVertex... vertices) {
+		final List<JobVertexID> ids = Arrays.stream(vertices)
+				.map(JobVertex::getID)
+				.collect(Collectors.toList());
+
+		final CheckpointCoordinatorConfiguration coordCfg =
+			new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder()
+				.setMaxConcurrentCheckpoints(1)
+				.setCheckpointInterval(10)
+				.setCheckpointTimeout(100_000)
+				.build();
+
+		return new JobCheckpointingSettings(ids, ids, ids, coordCfg, null);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test operator and coordinator implementations
+	// ------------------------------------------------------------------------
+
+	private static final class StartEvent implements OperatorEvent {}
+
+	private static final class EndEvent implements OperatorEvent {}
+
+	private static final class IntegerEvent implements OperatorEvent {
+
+		final int value;
+
+		IntegerEvent(int value) {
+			this.value = value;
+		}
+
+		@Override
+		public String toString() {
+			return "IntegerEvent " + value;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class EventSendingCoordinator implements OperatorCoordinator, Runnable {
+
+		private final Context context;
+
+		private ScheduledExecutorService executor;
+		private volatile ScheduledFuture<?> periodicTask;
+
+		private final int delay;
+		private final int maxNumber;
+		private int nextNumber;
+
+		private volatile CompletableFuture<byte[]> requestedCheckpoint;
+		private CompletableFuture<byte[]> nextToComplete;
+
+		private final int failAtMessage;
+		private boolean failedBefore;
+
+		private EventSendingCoordinator(Context context, int numEvents, int delay) {
+			checkArgument(delay > 0);
+			checkArgument(numEvents >= 3);
+
+			this.context = context;
+			this.maxNumber = numEvents;
+			this.delay = delay;
+			this.executor = Executors.newSingleThreadScheduledExecutor();
+
+			this.failAtMessage = numEvents / 3 + new Random().nextInt(numEvents / 3);
+		}
+
+		@Override
+		public void start() throws Exception {}
+
+		@Override
+		public void close() throws Exception {
+			executor.shutdownNow();
+			executor.awaitTermination(10, TimeUnit.SECONDS);
+		}
+
+		@Override
+		public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
+			if (subtask != 0 || !(event instanceof StartEvent)) {
+				throw new Exception(String.format("Don't recognize event '%s' from task %d.", event, subtask));
+			}
+
+			if (periodicTask != null) {
+				throw new Exception("periodic already running");
+			}
+			periodicTask = executor.scheduleWithFixedDelay(this, delay, delay, TimeUnit.MILLISECONDS);
+		}
+
+		@Override
+		public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+			periodicTask.cancel(false);
+			periodicTask = null;
+			executor.execute(() -> nextNumber = 0);
+		}
+
+		@Override
+		public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+			executor.execute(() -> nextNumber = bytesToInt(checkpointData));
+		}
+
+		@Override
+		public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+			final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+			requestedCheckpoint = checkpointFuture;
+			return checkpointFuture;
+		}
+
+		@Override
+		public void checkpointComplete(long checkpointId) {}
+
+		@SuppressWarnings("CallToPrintStackTrace")
+		@Override
+		public void run() {
+			try {
+				handleCheckpoint();
+				sendNextEvent();
+				checkWhetherToTriggerFailure();
+			} catch (Throwable t) {
+				// this is so that exceptions thrown in the scheduled executor don't just freeze the test
+				t.printStackTrace();
+				System.exit(-1);
+			}
+		}
+
+		private void handleCheckpoint() {
+			// we move the checkpoint one further so it completed after the next delay
+			if (nextToComplete != null) {
+				final int numToCheckpoint = Math.min(nextNumber, maxNumber);
+				nextToComplete.complete(intToBytes(numToCheckpoint));
+				nextToComplete = null;
+			}
+			if (requestedCheckpoint != null) {
+				nextToComplete = requestedCheckpoint;
+				requestedCheckpoint = null;
+			}
+		}
+
+		private void sendNextEvent() {
+			if (nextNumber > maxNumber) {
+				return;
+			}
+			try {
+				if (nextNumber == maxNumber) {
+					context.sendEvent(new EndEvent(), 0);
+				} else {
+					context.sendEvent(new IntegerEvent(nextNumber), 0);
+				}
+				nextNumber++;
+			} catch (TaskNotRunningException ignored) {}
+		}
+
+		private void checkWhetherToTriggerFailure() {
+			if (nextNumber >= failAtMessage && !failedBefore) {
+				failedBefore = true;
+				context.failJob(new Exception("test failure"));
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The runtime task that receives the events and accumulates the numbers. The task is stateful
+	 * and checkpoints the accumulator.
+	 */
+	public static final class EventCollectingTask extends AbstractInvokable {
+
+		private final OperatorID operatorID;
+		private final String accumulatorName;
+		private final LinkedBlockingQueue<Object> actions;
+
+		private volatile boolean running = true;
+
+		public EventCollectingTask(Environment environment) {
+			super(environment);
+			this.operatorID = OperatorID.fromJobVertexID(environment.getJobVertexId());
+			this.accumulatorName = environment.getTaskConfiguration().get(ACC_NAME);
+			this.actions = new LinkedBlockingQueue<>();
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			final ArrayList<Integer> collectedInts = new ArrayList<>();
+			restoreState(collectedInts);
+
+			// signal the coordinator to start
+			getEnvironment().getOperatorCoordinatorEventGateway()
+					.sendOperatorEventToCoordinator(operatorID, new SerializedValue<>(new StartEvent()));
+
+			// poor-man's mailbox
+			Object next;
+			while (running && !((next = actions.take()) instanceof EndEvent)) {
+				if (next instanceof IntegerEvent) {
+					collectedInts.add(((IntegerEvent) next).value);
+				} else if (next instanceof CheckpointMetaData) {
+					takeCheckpoint(((CheckpointMetaData) next).getCheckpointId(), collectedInts);
+				} else {
+					throw new Exception("Unrecognized: " + next);
+				}
+			}
+
+			if (running) {
+				final ListAccumulator<Integer> acc = new ListAccumulator<>();
+				collectedInts.forEach(acc::add);
+				getEnvironment().getAccumulatorRegistry().getUserMap().put(accumulatorName, acc);
+			}
+		}
+
+		@Override
+		public void cancel() throws Exception {
+			running = false;
+		}
+
+		@Override
+		public Future<Boolean> triggerCheckpointAsync(
+				CheckpointMetaData checkpointMetaData,
+				CheckpointOptions checkpointOptions,
+				boolean advanceToEndOfEventTime) {
+			actions.add(checkpointMetaData); // this signals the main thread should do a checkpoint
+			return CompletableFuture.completedFuture(true);
+		}
+
+		@Override
+		public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
+			return CompletableFuture.completedFuture(null);
+		}
+
+		@Override
+		public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
+			return CompletableFuture.completedFuture(null);
+		}
+
+		@Override
+		public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException {
+			try {
+				final OperatorEvent opEvent = event.deserializeValue(getUserCodeClassLoader());
+				actions.add(opEvent);
+			} catch (IOException | ClassNotFoundException e) {
+				throw new FlinkException(e);
+			}
+		}
+
+		private void takeCheckpoint(long checkpointId, List<Integer> state) throws Exception {
+			final StreamStateHandle handle = stateToHandle(state);
+			final TaskStateSnapshot snapshot = createSnapshot(handle, operatorID);
+			getEnvironment().acknowledgeCheckpoint(checkpointId, new CheckpointMetrics(), snapshot);
+		}
+
+		private void restoreState(List<Integer> target) throws Exception {
+			final StreamStateHandle stateHandle = readSnapshot(getEnvironment().getTaskStateManager(), operatorID);
+			if (stateHandle != null) {
+				final List<Integer> list = handleToState(stateHandle);
+				target.addAll(list);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  serialization shenannigans
+	// ------------------------------------------------------------------------
+
+	static byte[] intToBytes(int value) {
+		final byte[] bytes = new byte[4];
+		ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).putInt(0, value);
+		return bytes;
+	}
+
+	static int bytesToInt(byte[] bytes) {
+		assertEquals(4, bytes.length);
+		return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getInt(0);
+	}
+
+	static ByteStreamStateHandle stateToHandle(List<Integer> state) throws IOException {
+		final byte[] bytes = InstantiationUtil.serializeObject(state);
+		return new ByteStreamStateHandle("state", bytes);
+	}
+
+	static List<Integer> handleToState(StreamStateHandle handle) throws IOException, ClassNotFoundException {
+		final ByteStreamStateHandle byteHandle = (ByteStreamStateHandle) handle;
+		return InstantiationUtil.deserializeObject(byteHandle.getData(), EventCollectingTask.class.getClassLoader());
+	}
+
+	static TaskStateSnapshot createSnapshot(StreamStateHandle handle, OperatorID operatorId) {
+		final OperatorStateHandle.StateMetaInfo metaInfo = new OperatorStateHandle.StateMetaInfo(
+			new long[]{0}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
+
+		final OperatorStateHandle state = new OperatorStreamStateHandle(
+			Collections.singletonMap("état_et_moi_:_ça_fait_deux", metaInfo), handle);
+
+		final OperatorSubtaskState oss = new OperatorSubtaskState(
+			StateObjectCollection.singleton(state), StateObjectCollection.empty(),
+			StateObjectCollection.empty(), StateObjectCollection.empty());
+		return new TaskStateSnapshot(Collections.singletonMap(operatorId, oss));
+	}
+
+	@Nullable
+	static StreamStateHandle readSnapshot(TaskStateManager stateManager, OperatorID operatorId) {
+		final PrioritizedOperatorSubtaskState poss = stateManager.prioritizedOperatorState(operatorId);
+		if (!poss.isRestored()) {
+			return null;
+		}
+
+		final StateObjectCollection<OperatorStateHandle> opState =
+			stateManager.prioritizedOperatorState(operatorId).getPrioritizedManagedOperatorState().get(0);
+		final OperatorStateHandle handle = Iterators.getOnlyElement(opState.iterator());
+		return handle.getDelegateStateHandle();
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
similarity index 56%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
copy to flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
index 16acbb2..7cab82f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
@@ -16,61 +16,65 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.checkpoint;
+package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-
-import java.util.Collection;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * An {@link OperatorCoordinator} and its contextual information needed to trigger and
- * acknowledge a checkpoint.
+ * A testing mock implementation of the {@link OperatorCoordinatorCheckpointContext}.
  */
-public final class OperatorCoordinatorCheckpointContext {
-
-	private final OperatorCoordinator coordinator;
+public class MockOperatorCoordinatorCheckpointContext implements OperatorCoordinatorCheckpointContext {
 
 	private final OperatorID operatorId;
-
+	private final OperatorCoordinator coordinator;
+	private final int parallelism;
 	private final int maxParallelism;
 
-	private final int currentParallelism;
+	public MockOperatorCoordinatorCheckpointContext() {
+		this(new OperatorID(), new MockOperatorCoordinator(), 50, 256);
+	}
 
-	public OperatorCoordinatorCheckpointContext(
-			OperatorCoordinator coordinator,
+	public MockOperatorCoordinatorCheckpointContext(
 			OperatorID operatorId,
-			int maxParallelism,
-			int currentParallelism) {
-
-		this.coordinator = checkNotNull(coordinator);
-		this.operatorId = checkNotNull(operatorId);
+			OperatorCoordinator coordinator,
+			int parallelism,
+			int maxParallelism) {
+		this.operatorId = operatorId;
+		this.coordinator = coordinator;
+		this.parallelism = parallelism;
 		this.maxParallelism = maxParallelism;
-		this.currentParallelism = currentParallelism;
 	}
 
+	@Override
 	public OperatorCoordinator coordinator() {
 		return coordinator;
 	}
 
+	@Override
 	public OperatorID operatorId() {
 		return operatorId;
 	}
 
+	@Override
 	public int maxParallelism() {
 		return maxParallelism;
 	}
 
+	@Override
 	public int currentParallelism() {
-		return currentParallelism;
+		return parallelism;
 	}
 
-	public static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
-		return infos.stream()
-			.map(OperatorCoordinatorCheckpointContext::operatorId)
-			.collect(Collectors.toList());
-	}
+	@Override
+	public void onCallTriggerCheckpoint(long checkpointId) {}
+
+	@Override
+	public void onCheckpointStateFutureComplete(long checkpointId) {}
+
+	@Override
+	public void afterSourceBarrierInjection(long checkpointId) {}
+
+	@Override
+	public void abortCurrentTriggering() {}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 14a136d..8e5404a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -505,11 +505,11 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		final Optional<OperatorCoordinatorHolder> coordinatorOptional = vertexWithCoordinator
 				.getOperatorCoordinators()
 				.stream()
-				.filter((holder) -> holder.getOperatorId().equals(testOperatorId))
+				.filter((holder) -> holder.operatorId().equals(testOperatorId))
 				.findFirst();
 		assertTrue("vertex does not contain coordinator", coordinatorOptional.isPresent());
 
-		final OperatorCoordinator coordinator = coordinatorOptional.get().getCoordinator();
+		final OperatorCoordinator coordinator = coordinatorOptional.get().coordinator();
 		assertThat(coordinator, instanceOf(TestingOperatorCoordinator.class));
 
 		return (TestingOperatorCoordinator) coordinator;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
new file mode 100644
index 0000000..9b0325d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.TestEventSender.EventWithSubtask;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the {@link OperatorEventValve}.
+ */
+public class OperatorEventValveTest {
+
+	@Test
+	public void eventsPassThroughOpenValve() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+
+		final OperatorEvent event = new TestOperatorEvent();
+		final CompletableFuture<Acknowledge> future = valve.sendEvent(new SerializedValue<>(event), 11);
+
+		assertThat(sender.events, contains(new EventWithSubtask(event, 11)));
+		assertTrue(future.isDone());
+	}
+
+	@Test
+	public void eventsBlockedByClosedValve() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+		valve.shutValve();
+
+		final CompletableFuture<Acknowledge> future =
+				valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 1);
+
+		assertTrue(sender.events.isEmpty());
+		assertFalse(future.isDone());
+	}
+
+	@Test
+	public void eventsReleasedAfterOpeningValve() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+		valve.shutValve();
+
+		final OperatorEvent event1 = new TestOperatorEvent();
+		final OperatorEvent event2 = new TestOperatorEvent();
+		final CompletableFuture<Acknowledge> future1 = valve.sendEvent(new SerializedValue<>(event1), 3);
+		final CompletableFuture<Acknowledge> future2 = valve.sendEvent(new SerializedValue<>(event2), 0);
+
+		valve.openValve();
+
+		assertThat(sender.events, containsInAnyOrder(
+			new EventWithSubtask(event1, 3),
+			new EventWithSubtask(event2, 0)
+		));
+		assertTrue(future1.isDone());
+		assertTrue(future2.isDone());
+	}
+
+	@Test
+	public void releasedEventsForwardSendFailures() throws Exception {
+		final TestEventSender sender = new TestEventSender(new FlinkException("test"));
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+		valve.shutValve();
+
+		final CompletableFuture<Acknowledge> future =
+				valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 10);
+		valve.openValve();
+
+		assertTrue(future.isCompletedExceptionally());
+	}
+
+	@Test
+	public void resetDropsAllEvents() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+		valve.shutValve();
+
+		valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 0);
+		valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 1);
+
+		valve.reset();
+		valve.openValve();
+
+		assertTrue(sender.events.isEmpty());
+	}
+
+	@Test
+	public void resetForTaskDropsSelectiveEvents() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+		valve.shutValve();
+
+		final OperatorEvent event1 = new TestOperatorEvent();
+		final OperatorEvent event2 = new TestOperatorEvent();
+		final CompletableFuture<Acknowledge> future1 = valve.sendEvent(new SerializedValue<>(event1), 0);
+		final CompletableFuture<Acknowledge> future2 = valve.sendEvent(new SerializedValue<>(event2), 1);
+
+		valve.resetForTask(1);
+		valve.openValve();
+
+		assertThat(sender.events, contains(new EventWithSubtask(event1, 0)));
+		assertTrue(future1.isDone());
+		assertTrue(future2.isCompletedExceptionally());
+	}
+
+	@Test
+	public void resetOpensValve() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+
+		valve.shutValve();
+		valve.reset();
+
+		assertFalse(valve.isShut());
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestEventSender.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestEventSender.java
new file mode 100644
index 0000000..8b93dea
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestEventSender.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+
+/**
+ * A test implementation of the BiFunction interface used as the underlying event sender in the
+ * {@link OperatorCoordinatorHolder}.
+ */
+final class TestEventSender
+		implements BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> {
+
+	final ArrayList<EventWithSubtask> events = new ArrayList<>();
+
+	@Nullable
+	private final Throwable failureCause;
+
+	/**
+	 * Creates a sender that collects events and acknowledges all events successfully.
+	 */
+	TestEventSender() {
+		this(null);
+	}
+
+	/**
+	 * Creates a sender that collects events and fails all the send-futures with the given exception,
+	 * if it is non-null.
+	 */
+	TestEventSender(@Nullable Throwable failureCause) {
+		this.failureCause = failureCause;
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> apply(SerializedValue<OperatorEvent> event, Integer subtask) {
+		final OperatorEvent deserializedEvent;
+		try {
+			deserializedEvent = event.deserializeValue(getClass().getClassLoader());
+		} catch (IOException | ClassNotFoundException e) {
+			throw new AssertionError(e);
+		}
+		events.add(new EventWithSubtask(deserializedEvent, subtask));
+
+		return failureCause == null
+				? CompletableFuture.completedFuture(Acknowledge.get())
+				: FutureUtils.completedExceptionally(failureCause);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A combination of an {@link OperatorEvent} and the target subtask it is sent to.
+	 */
+	static final class EventWithSubtask {
+
+		final OperatorEvent event;
+		final int subtask;
+
+		EventWithSubtask(OperatorEvent event, int subtask) {
+			this.event = event;
+			this.subtask = subtask;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			final EventWithSubtask that = (EventWithSubtask) o;
+			return subtask == that.subtask &&
+				event.equals(that.event);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(event, subtask);
+		}
+
+		@Override
+		public String toString() {
+			return event + " => subtask " + subtask;
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java
index 6eb600b..8622143 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java
@@ -23,4 +23,37 @@ package org.apache.flink.runtime.operators.coordination;
  */
 public final class TestOperatorEvent implements OperatorEvent {
 	private static final long serialVersionUID = 1L;
+
+	private final int value;
+
+	public TestOperatorEvent() {
+		// pick some random and rather unique value
+		this.value = System.identityHashCode(this);
+	}
+
+	public TestOperatorEvent(int value) {
+		this.value = value;
+	}
+
+	public int getValue() {
+		return value;
+	}
+
+	@Override
+	public int hashCode() {
+		return value;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj == this || (
+				obj != null &&
+				obj.getClass() == TestOperatorEvent.class &&
+					((TestOperatorEvent) obj).value == this.value);
+	}
+
+	@Override
+	public String toString() {
+		return "TestOperatorEvent (" + value + ')';
+	}
 }


[flink] 06/11: [FLINK-16986][coordination][refactor] Change executor in OperatorCoordinatorSchedulerTest

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7ceee2395330c9c723ae83a85de2cf44f8378efc
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 29 08:55:36 2020 +0200

    [FLINK-16986][coordination][refactor] Change executor in OperatorCoordinatorSchedulerTest
    
    This prepares the test to be ready to run with proper main-thread-execution in the
    OperatorCoordinators.
---
 .../OperatorCoordinatorSchedulerTest.java          | 41 +++++++++++++++++++---
 .../coordination/TestingOperatorCoordinator.java   |  8 +++++
 .../runtime/scheduler/SchedulerTestingUtils.java   | 10 ++++--
 3 files changed, 52 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 8e5404a..6ec4dd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
@@ -64,6 +65,7 @@ import java.util.Collections;
 import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
@@ -204,6 +206,7 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		final OperatorCoordinator.Context context = getCoordinator(scheduler).getContext();
 
 		final CompletableFuture<?> result = context.sendEvent(new TestOperatorEvent(), 0);
+		executor.triggerAll(); // process event sending
 
 		assertThat(result, futureFailedWith(TaskNotRunningException.class));
 	}
@@ -214,6 +217,7 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 
 		final OperatorCoordinator.Context context = getCoordinator(scheduler).getContext();
 		final CompletableFuture<?> result = context.sendEvent(new TestOperatorEvent(), 0);
+		executor.triggerAll();  // process event sending
 
 		assertThat(result, futureFailedWith(TestException.class));
 	}
@@ -482,7 +486,10 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		}
 
 		final DefaultScheduler scheduler = schedulerBuilder.build();
-		scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+		final ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(
+			(ScheduledExecutorService) executor, Thread.currentThread());
+		scheduler.setMainThreadExecutor(mainThreadExecutor);
 
 		this.createdScheduler = scheduler;
 		return scheduler;
@@ -529,7 +536,10 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 
 	private void failAndRedeployTask(DefaultScheduler scheduler, int subtask) {
 		failTask(scheduler, subtask);
+
+		executor.triggerAll();
 		executor.triggerScheduledTasks();
+		executor.triggerAll();
 
 		// guard the test assumptions: This must lead to a restarting and redeploying
 		assertEquals(ExecutionState.DEPLOYING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask));
@@ -547,8 +557,13 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		scheduler.handleGlobalFailure(reason);
 		SchedulerTestingUtils.setAllExecutionsToCancelled(scheduler);
 
-		executor.triggerScheduledTasks();   // this handles the restart / redeploy
+		// make sure we propagate all asynchronous and delayed actions
+		executor.triggerAll();
+		executor.triggerScheduledTasks();
+		executor.triggerAll();
+
 		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
+		executor.triggerAll();
 
 		// guard the test assumptions: This must bring the tasks back to RUNNING
 		assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
@@ -564,7 +579,17 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 
 	private CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler scheduler) throws Exception {
 		final CompletableFuture<CompletedCheckpoint> future = SchedulerTestingUtils.triggerCheckpoint(scheduler);
-		executor.triggerAll();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		// the Checkpoint Coordinator executes parts of the logic in its timer thread, and delegates some calls
+		// to the scheduler executor. so we need to do a mix of waiting for the timer thread and working off
+		// tasks in the scheduler executor.
+		// we can drop this here once the CheckpointCoordinator also runs in a 'main thread executor'.
+		while (!(coordinator.hasTriggeredCheckpoint() || future.isDone())) {
+			executor.triggerAll();
+			Thread.sleep(1);
+		}
+
 		return future;
 	}
 
@@ -585,7 +610,15 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		acknowledgeCurrentCheckpoint(scheduler);
 
 		// wait until checkpoint has completed
-		return checkpointFuture.get().getCheckpointID();
+		final long checkpointId = checkpointFuture.get().getCheckpointID();
+
+		// now wait until it has been acknowledged
+		while (!testingOperatorCoordinator.hasCompleteCheckpoint()) {
+			executor.triggerAll();
+			Thread.sleep(1);
+		}
+
+		return checkpointId;
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index 5d93bcc..e914afe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -121,10 +121,18 @@ class TestingOperatorCoordinator implements OperatorCoordinator {
 		return triggeredCheckpoints.take();
 	}
 
+	public boolean hasTriggeredCheckpoint() {
+		return !triggeredCheckpoints.isEmpty();
+	}
+
 	public long getLastCheckpointComplete() throws InterruptedException {
 		return lastCheckpointComplete.take();
 	}
 
+	public boolean hasCompleteCheckpoint() throws InterruptedException {
+		return !lastCheckpointComplete.isEmpty();
+	}
+
 	// ------------------------------------------------------------------------
 	//  The provider for this coordinator implementation
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 6f62067..1686c5a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -90,6 +90,7 @@ import java.util.stream.StreamSupport;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -265,9 +266,12 @@ public class SchedulerTestingUtils {
 
 	public static void setAllExecutionsToCancelled(final DefaultScheduler scheduler) {
 		final JobID jid = scheduler.getJobId();
-		getAllCurrentExecutionAttempts(scheduler).forEach(
-			(attemptId) -> scheduler.updateTaskExecutionState(new TaskExecutionState(jid, attemptId, ExecutionState.CANCELED))
-		);
+		for (final ExecutionAttemptID attemptId : getAllCurrentExecutionAttempts(scheduler)) {
+			final boolean setToRunning = scheduler.updateTaskExecutionState(
+					new TaskExecutionState(jid, attemptId, ExecutionState.CANCELED));
+
+			assertTrue("could not switch task to RUNNING", setToRunning);
+		}
 	}
 
 	public static void acknowledgePendingCheckpoint(final DefaultScheduler scheduler, final long checkpointId) throws CheckpointException {


[flink] 01/11: [hotfix][runtime] Fox log message for web.log.file to only pring config key and not deprecated keys

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b66d16f475c6a83c527205c673e308e72ee39adc
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 29 11:12:33 2020 +0200

    [hotfix][runtime] Fox log message for web.log.file to only pring config key and not deprecated keys
---
 .../main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index cbf96b5..436d3a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -96,7 +96,7 @@ public final class WebMonitorUtils {
 			if (logFilePath == null || logFilePath.length() < 4) {
 				LOG.warn("JobManager log files are unavailable in the web dashboard. " +
 					"Log file location not found in environment variable '{}' or configuration key '{}'.",
-					logEnv, WebOptions.LOG_PATH);
+					logEnv, WebOptions.LOG_PATH.key());
 				return new LogFileLocation(null, null, null);
 			}
 


[flink] 09/11: [hotfix][coordination] Improve JavaDocs for OperatorCoordinator and OperatorCoordinatorHolder

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4aff693c25d1e489f83a53e2c5b9e1bfb364c3b3
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 29 21:37:27 2020 +0200

    [hotfix][coordination] Improve JavaDocs for OperatorCoordinator and OperatorCoordinatorHolder
---
 .../coordination/OperatorCoordinator.java          | 91 ++++++++++++++++++++--
 .../coordination/OperatorCoordinatorHolder.java    | 68 +++++++++++++---
 2 files changed, 142 insertions(+), 17 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index dc06ae0..cac03d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -77,21 +77,72 @@ public interface OperatorCoordinator extends AutoCloseable {
 
 	// ------------------------------------------------------------------------
 
-	void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception;
+	/**
+	 * Takes a checkpoint or the coordinator. The checkpoint is identified by the given ID.
+	 *
+	 * <p>To confirm the checkpoint and store state in it, the given {@code CompletableFuture}
+	 * must be completed with the state. To abort or dis-confirm the checkpoint, the given
+	 * {@code CompletableFuture} must be completed exceptionally.
+	 * In any case, the given {@code CompletableFuture} must be completed in some way, otherwise the
+	 * checkpoint will not progress.
+	 *
+	 * <h3>Exactly-once Semantics</h3>
+	 *
+	 * <p>The semantics are defined as follows:
+	 * <ul>
+	 *   <li>The point in time when the checkpoint future is completed is considered the point in time
+	 *       when the coordinator's checkpoint takes place.
+	 *   <li>The OperatorCoordinator implementation must have a way of strictly ordering the sending
+	 *       of events and the completion of the checkpoint future (for example the same thread does
+	 *       both actions, or both actions are guarded by a mutex).
+	 *   <li>Every event sent before the checkpoint future is completed is considered before the checkpoint.
+	 *   <li>Every event sent after the checkpoint future is completed is considered to be after the checkpoint.
+	 * </ul>
+	 */
+	void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception;
 
 	/**
 	 * Notifies the coordinator that the checkpoint with the given checkpointId completes and
 	 * was committed.
 	 *
-	 * <p><b>Important:</b> This method is not supposed to throw an exception, because by the
-	 * time we notify that the checkpoint is complete, the checkpoint is committed and cannot be
-	 * aborted any more. If the coordinator gets into an inconsistent state internally, it should
-	 * fail the job ({@link Context#failJob(Throwable)}) instead. Any exception propagating from
-	 * this method may be treated as a fatal error for the JobManager, crashing the JobManager,
-	 * and leading to an expensive "master failover" procedure.
+	 * <h3>Checkpoint Subsuming</h3>
+	 *
+	 * <p>Checkpoint IDs are strictly increasing. A checkpoint with higher ID always subsumes
+	 * a checkpoint with lower ID. For example, when checkpoint T is confirmed complete, the
+	 * code should treat all checkpoints with lower ID (T-1, T-2, etc.) also as confirmed.
+	 *
+	 * <h3>Exceptions</h3>
+	 *
+	 * <p>This method is not supposed to throw an exception indicating the the checkpoint cannot
+	 * be completed. By the time we notify that the checkpoint is complete, the checkpoint is
+	 * committed and cannot be aborted any more.
+	 *
+	 * <p>If the coordinator gets into an inconsistent state internally, as a result of logic that
+	 * runs after this notification, it should fail the job ({@link Context#failJob(Throwable)})
+	 * instead. Any exception propagating from this method may be treated as a fatal error for the
+	 * JobManager, crashing the JobManager, and leading to an expensive "master failover" procedure.
 	 */
 	void checkpointComplete(long checkpointId);
 
+	/**
+	 * Resets the coordinator to the given checkpoint.
+	 * When this method is called, the coordinator can discard all other in-flight working state.
+	 * All subtasks will also have been reset to the same checkpoint.
+	 *
+	 * <p>This method is expected to behave synchronously with respect to other method calls and calls
+	 * to {@code Context} methods. For example, Events being sent by the Coordinator after this method
+	 * returns are assumed to take place after the checkpoint that was restored.
+	 *
+	 * <h2>Restoring implicitly notifies of Checkpoint Completion</h2>
+	 *
+	 * <p>Restoring to a checkpoint is a way of confirming that the checkpoint is complete.
+	 * It is safe to commit side-effects that are predicated on checkpoint completion after this
+	 * call.
+	 *
+	 * <p>Even if no call to {@link #checkpointComplete(long)} happened, the checkpoint can still be
+	 * complete (for example when a system failure happened directly after committing the checkpoint,
+	 * before calling the {@link #checkpointComplete(long)} method).
+	 */
 	void resetToCheckpoint(byte[] checkpointData) throws Exception;
 
 	// ------------------------------------------------------------------------
@@ -103,14 +154,34 @@ public interface OperatorCoordinator extends AutoCloseable {
 	 */
 	interface Context {
 
+		/**
+		 * Gets the ID of the operator to which the coordinator belongs.
+		 */
 		OperatorID getOperatorId();
 
+		/**
+		 * Sends an event to the parallel subtask with the given subtask index.
+		 *
+		 * <p>The returned future is completed successfully once the event has been received
+		 * by the target TaskManager. The future is completed exceptionally if the event cannot be sent.
+		 * That includes situations where the target task is not running.
+		 */
 		CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int targetSubtask) throws TaskNotRunningException;
 
 		void failTask(int subtask, Throwable cause);
 
+		/**
+		 * Fails the job and trigger a global failover operation.
+		 *
+		 * <p>This operation restores the entire job to the latest complete checkpoint. This
+		 * is useful to recover from inconsistent situations (the view from the coordinator and its
+		 * subtasks as diverged), but is expensive and should be used with care.
+		 */
 		void failJob(Throwable cause);
 
+		/**
+		 * Gets the current parallelism with which this operator is executed.
+		 */
 		int currentParallelism();
 	}
 
@@ -126,8 +197,14 @@ public interface OperatorCoordinator extends AutoCloseable {
 	 */
 	interface Provider extends Serializable {
 
+		/**
+		 * Gets the ID of the operator to which the coordinator belongs.
+		 */
 		OperatorID getOperatorId();
 
+		/**
+		 * Creates the {@code OperatorCoordinator}, using the given context.
+		 */
 		OperatorCoordinator create(Context context);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 321b401..e11bbb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -44,18 +44,66 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * A holder for an {@link OperatorCoordinator.Context} and all the necessary facility around it that
- * is needed to interaction between the Coordinator, the Scheduler, the Checkpoint Coordinator, etc.
+ * The {@code OperatorCoordinatorHolder} holds the {@link OperatorCoordinator} and manages all its
+ * interactions with the remaining components.
+ * It provides the context and is responsible for checkpointing and exactly once semantics.
  *
- * <p>The holder is itself a {@link OperatorCoordinator} and forwards all calls to the actual coordinator.
- * That way, we can make adjustments to assumptions about the threading model and message/call forwarding
- * without needing to adjust all the call sites that interact with the coordinator.
+ * <h3>Exactly-one Semantics</h3>
  *
- * <p>This is also needed, unfortunately, because we need a lazy two-step initialization:
- * When the execution graph is created, we need to create the coordinators (or the holders, to be specific)
- * because the CheckpointCoordinator is also created in the ExecutionGraph and needs access to them.
- * However, the real Coordinators can only be created after SchedulerNG was created, because they need
- * a reference to it for the failure calls.
+ * <p>The semantics are described under {@link OperatorCoordinator#checkpointCoordinator(long, CompletableFuture)}.
+ *
+ * <h3>Exactly-one Mechanism</h3>
+ *
+ * <p>This implementation can handle one checkpoint being triggered at a time. If another checkpoint
+ * is triggered while the triggering of the first one was not completed or aborted, this class will
+ * throw an exception. That is in line with the capabilities of the Checkpoint Coordinator, which can
+ * handle multiple concurrent checkpoints on the TaskManagers, but only one concurrent triggering phase.
+ *
+ * <p>The mechanism for exactly once semantics is as follows:
+ *
+ * <ul>
+ *   <li>Events pass through a special channel, the {@link OperatorEventValve}. If we are not currently
+ *       triggering a checkpoint, then events simply pass through.
+ *   <li>Atomically, with the completion of the checkpoint future for the coordinator, this operator
+ *       operator event valve is closed. Events coming after that are held back (buffered), because
+ *       they belong to the epoch after the checkpoint.
+ *   <li>Once all coordinators in the job have completed the checkpoint, the barriers to the sources
+ *       are injected. After that (see {@link #afterSourceBarrierInjection(long)}) the valves are
+ *       opened again and the events are sent.
+ *   <li>If a task fails in the meantime, the events are dropped from the valve. From the coordinator's
+ *       perspective, these events are lost, because they were sent to a failed subtask after it's latest
+ *       complete checkpoint.
+ * </ul>
+ *
+ * <p><b>IMPORTANT:</b> A critical assumption is that all events from the scheduler to the Tasks are
+ * transported strictly in order. Events being sent from the coordinator after the checkpoint barrier
+ * was injected must not overtake the checkpoint barrier. This is currently guaranteed by Flink's
+ * RPC mechanism.
+ *
+ * <p>Consider this example:
+ * <pre>
+ * Coordinator one events: => a . . b . |trigger| . . |complete| . . c . . d . |barrier| . e . f
+ * Coordinator two events: => . . x . . |trigger| . . . . . . . . . .|complete||barrier| . . y . . z
+ * </pre>
+ *
+ * <p>Two coordinators trigger checkpoints at the same time. 'Coordinator Two' takes longer to complete,
+ * and in the meantime 'Coordinator One' sends more events.
+ *
+ * <p>'Coordinator One' emits events 'c' and 'd' after it finished its checkpoint, meaning the events must
+ * take place after the checkpoint. But they are before the barrier injection, meaning the runtime
+ * task would see them before the checkpoint, if they were immediately transported.
+ *
+ * <p>'Coordinator One' closes its valve as soon as the checkpoint future completes. Events 'c' and 'd'
+ * get held back in the valve. Once 'Coordinator Two' completes its checkpoint, the barriers are sent
+ * to the sources. Then the valves are opened, and events 'c' and 'd' can flow to the tasks where they
+ * are received after the barrier.
+ *
+ * <h3>Concurrency and Threading Model</h3>
+ *
+ * <p>This component runs mainly in a main-thread-executor, like RPC endpoints. However,
+ * some actions need to be triggered synchronously by other threads. Most notably, when the
+ * checkpoint future is completed by the {@code OperatorCoordinator} implementation, we need to
+ * synchronously suspend event-sending.
  */
 public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorCoordinatorCheckpointContext {
 


[flink] 10/11: [hotfix][coordination] Remove unused class ExecutionJobVertexCoordinatorContext

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d046fea0019bfc112ae90b42a31ecfecf4887e16
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 29 21:46:36 2020 +0200

    [hotfix][coordination] Remove unused class ExecutionJobVertexCoordinatorContext
    
    This class was left over from a prior refactoring.
---
 .../ExecutionJobVertexCoordinatorContext.java      | 85 ----------------------
 1 file changed, 85 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexCoordinatorContext.java
deleted file mode 100644
index c9e99de..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexCoordinatorContext.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.SerializedValue;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * An implementation of the {@link OperatorCoordinator.Context} that delegates call to an
- * {@link ExecutionJobVertex}.
- */
-final class ExecutionJobVertexCoordinatorContext implements OperatorCoordinator.Context {
-
-	private final OperatorID operatorId;
-
-	private final ExecutionJobVertex jobVertex;
-
-	ExecutionJobVertexCoordinatorContext(OperatorID operatorId, ExecutionJobVertex jobVertex) {
-		this.operatorId = operatorId;
-		this.jobVertex = jobVertex;
-	}
-
-	@Override
-	public OperatorID getOperatorId() {
-		return operatorId;
-	}
-
-	@Override
-	public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int targetSubtask) {
-		final SerializedValue<OperatorEvent> serializedEvent;
-		try {
-			serializedEvent = new SerializedValue<>(evt);
-		}
-		catch (IOException e) {
-			// we do not expect that this exception is handled by the caller, so we make it
-			// unchecked so that it can bubble up
-			throw new FlinkRuntimeException("Cannot serialize operator event", e);
-		}
-
-		return getTaskExecution(targetSubtask).sendOperatorEvent(operatorId, serializedEvent);
-	}
-
-	@Override
-	public void failTask(int subtask, Throwable cause) {
-		final Execution taskExecution = getTaskExecution(subtask);
-		taskExecution.fail(cause);
-	}
-
-	@Override
-	public void failJob(Throwable cause) {
-		jobVertex.getGraph().failGlobal(cause);
-	}
-
-	@Override
-	public int currentParallelism() {
-		return jobVertex.getParallelism();
-	}
-
-	private Execution getTaskExecution(int subtask) {
-		return jobVertex.getTaskVertices()[subtask].getCurrentExecutionAttempt();
-	}
-}


[flink] 03/11: [hotfix][checkpointing] Improve exception in case Coordinator State ack fails

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b144f13a2cc6dcebe38425ebac81a56cbb6e6f7d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 29 19:43:49 2020 +0200

    [hotfix][checkpointing] Improve exception in case Coordinator State ack fails
---
 .../runtime/checkpoint/OperatorCoordinatorCheckpoints.java     | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
index 2423bcb..39ac10f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
@@ -105,8 +105,14 @@ final class OperatorCoordinatorCheckpoints {
 				checkpoint.acknowledgeCoordinatorState(snapshot.coordinator, snapshot.state);
 
 			if (result != PendingCheckpoint.TaskAcknowledgeResult.SUCCESS) {
-				throw new CheckpointException("Coordinator state not acknowledged successfully: " + result,
-					CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
+				final String errorMessage = "Coordinator state not acknowledged successfully: " + result;
+				final Throwable error = checkpoint.isDiscarded() ? checkpoint.getFailureCause() : null;
+
+				if (error != null) {
+					throw new CheckpointException(errorMessage, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, error);
+				} else {
+					throw new CheckpointException(errorMessage, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
+				}
 			}
 		}
 	}