You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/06/15 16:27:57 UTC

[flink] branch master updated (6e688e1 -> e01cab2)

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 6e688e1  [FLINK-13783][table] Implement type inference for string functions
     new 24201cc  [FLINK-18039][connector/common] Introduce a RecreateOnResetOperatorCoordinator class to recreate an OperatorCoordinator instance when resetToCheckpoint() is invoked. Let SourceCoordinator leverage RecreateOnResetOperatorCoordinator to ensure a clean checkpoint reset.
     new e01cab2  [FLINK-18039] Ensure the source events are sent via the coordinator thread.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../RecreateOnResetOperatorCoordinator.java        | 195 +++++++++++++++++++++
 .../source/coordinator/ExecutorNotifier.java       |  25 ++-
 .../source/coordinator/SourceCoordinator.java      |   5 +-
 .../coordinator/SourceCoordinatorContext.java      |  98 +++++++----
 .../coordinator/SourceCoordinatorProvider.java     |  18 +-
 .../RecreateOnResetOperatorCoordinatorTest.java    | 119 +++++++++++++
 .../source/coordinator/ExecutorNotifierTest.java   | 108 ++++++++++++
 .../coordinator/SourceCoordinatorProviderTest.java |  99 +++++++++++
 8 files changed, 615 insertions(+), 52 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java


[flink] 02/02: [FLINK-18039] Ensure the source events are sent via the coordinator thread.

Posted by jq...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e01cab2f713802f6fb92f7472a258c07f2c18af7
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Thu Jun 11 17:30:28 2020 +0800

    [FLINK-18039] Ensure the source events are sent via the coordinator thread.
---
 .../coordinator/SourceCoordinatorContext.java      | 87 ++++++++++++++--------
 1 file changed, 54 insertions(+), 33 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 322778a..b6ceda9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -128,13 +128,15 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
 
 	@Override
 	public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
-		try {
-			operatorCoordinatorContext.sendEvent(new SourceEventWrapper(event), subtaskId);
-		} catch (TaskNotRunningException e) {
-			throw new FlinkRuntimeException(String.format("Failed to send event %s to subtask %d",
-					event,
-					subtaskId), e);
-		}
+		callInCoordinatorThread(() -> {
+			try {
+				operatorCoordinatorContext.sendEvent(new SourceEventWrapper(event), subtaskId);
+				return null;
+			} catch (TaskNotRunningException e) {
+				throw new FlinkRuntimeException(
+						String.format("Failed to send event %s to subtask %d", event, subtaskId), e);
+			}
+		}, String.format("Failed to send event %s to subtask %d", event, subtaskId));
 	}
 
 	@Override
@@ -150,34 +152,28 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
 	@Override
 	public void assignSplits(SplitsAssignment<SplitT> assignment) {
 		// Ensure the split assignment is done by the the coordinator executor.
-		if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
-			try {
-				coordinatorExecutor.submit(() -> assignSplits(assignment)).get();
-				return;
-			} catch (InterruptedException | ExecutionException e) {
-				throw new FlinkRuntimeException("Failed to assign splits due to", e);
+		callInCoordinatorThread(() -> {
+			// Ensure all the subtasks in the assignment have registered.
+			for (Integer subtaskId : assignment.assignment().keySet()) {
+				if (!registeredReaders.containsKey(subtaskId)) {
+					throw new IllegalArgumentException(String.format(
+							"Cannot assign splits %s to subtask %d because the subtask is not registered.",
+							registeredReaders.get(subtaskId), subtaskId));
+				}
 			}
-		}
-
-		// Ensure all the subtasks in the assignment have registered.
-		for (Integer subtaskId : assignment.assignment().keySet()) {
-			if (!registeredReaders.containsKey(subtaskId)) {
-				throw new IllegalArgumentException(String.format(
-						"Cannot assign splits %s to subtask %d because the subtask is not registered.",
-						registeredReaders.get(subtaskId), subtaskId));
-			}
-		}
 
-		assignmentTracker.recordSplitAssignment(assignment);
-		assignment.assignment().forEach(
-				(id, splits) -> {
-					try {
-						operatorCoordinatorContext.sendEvent(new AddSplitEvent<>(splits), id);
-					} catch (TaskNotRunningException e) {
-						throw new FlinkRuntimeException(String.format(
-								"Failed to assign splits %s to reader %d.", splits, id), e);
-					}
-				});
+			assignmentTracker.recordSplitAssignment(assignment);
+			assignment.assignment().forEach(
+					(id, splits) -> {
+						try {
+							operatorCoordinatorContext.sendEvent(new AddSplitEvent<>(splits), id);
+						} catch (TaskNotRunningException e) {
+							throw new FlinkRuntimeException(String.format(
+									"Failed to assign splits %s to reader %d.", splits, id), e);
+						}
+					});
+			return null;
+		}, String.format("Failed to assign splits %s due to ", assignment));
 	}
 
 	@Override
@@ -280,4 +276,29 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
 	void onCheckpointComplete(long checkpointId) {
 		assignmentTracker.onCheckpointComplete(checkpointId);
 	}
+
+	// ---------------- private helper methods -----------------
+
+	/**
+	 * A helper method that delegates the callable to the coordinator thread if the
+	 * current thread is not the coordinator thread, otherwise call the callable right away.
+	 *
+	 * @param callable the callable to delegate.
+	 */
+	private <V> V callInCoordinatorThread(Callable<V> callable, String errorMessage) {
+		// Ensure the split assignment is done by the the coordinator executor.
+		if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+			try {
+				return coordinatorExecutor.submit(callable).get();
+			} catch (InterruptedException | ExecutionException e) {
+				throw new FlinkRuntimeException(errorMessage, e);
+			}
+		}
+
+		try {
+			return callable.call();
+		} catch (Exception e) {
+			throw new FlinkRuntimeException(errorMessage, e);
+		}
+	}
 }


[flink] 01/02: [FLINK-18039][connector/common] Introduce a RecreateOnResetOperatorCoordinator class to recreate an OperatorCoordinator instance when resetToCheckpoint() is invoked. Let SourceCoordinator leverage RecreateOnResetOperatorCoordinator to ensure a clean checkpoint reset.

Posted by jq...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 24201cc1b6c46c689abbff8635a01cec7b088983
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Sun Jun 7 08:19:41 2020 +0800

    [FLINK-18039][connector/common] Introduce a RecreateOnResetOperatorCoordinator class to recreate an OperatorCoordinator instance when resetToCheckpoint() is invoked. Let SourceCoordinator leverage RecreateOnResetOperatorCoordinator to ensure a clean checkpoint reset.
---
 .../RecreateOnResetOperatorCoordinator.java        | 195 +++++++++++++++++++++
 .../source/coordinator/ExecutorNotifier.java       |  25 ++-
 .../source/coordinator/SourceCoordinator.java      |   5 +-
 .../coordinator/SourceCoordinatorContext.java      |  11 +-
 .../coordinator/SourceCoordinatorProvider.java     |  18 +-
 .../RecreateOnResetOperatorCoordinatorTest.java    | 119 +++++++++++++
 .../source/coordinator/ExecutorNotifierTest.java   | 108 ++++++++++++
 .../coordinator/SourceCoordinatorProviderTest.java |  99 +++++++++++
 8 files changed, 561 insertions(+), 19 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
new file mode 100644
index 0000000..ea91156
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -0,0 +1,195 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A class that will recreate a new {@link OperatorCoordinator} instance when
+ * reset to checkpoint.
+ */
+public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator {
+
+	private final Provider provider;
+	private QuiesceableContext quiesceableContext;
+	private OperatorCoordinator coordinator;
+
+	private boolean started;
+
+	private RecreateOnResetOperatorCoordinator(
+			QuiesceableContext context,
+			Provider provider) {
+		this.quiesceableContext = context;
+		this.provider = provider;
+		this.coordinator = provider.getCoordinator(context);
+		this.started = false;
+	}
+
+	@Override
+	public void start() throws Exception {
+		coordinator.start();
+		started = true;
+	}
+
+	@Override
+	public void close() throws Exception {
+		coordinator.close();
+	}
+
+	@Override
+	public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
+		coordinator.handleEventFromOperator(subtask, event);
+	}
+
+	@Override
+	public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+		coordinator.subtaskFailed(subtask, reason);
+	}
+
+	@Override
+	public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception {
+		coordinator.checkpointCoordinator(checkpointId, resultFuture);
+	}
+
+	@Override
+	public void checkpointComplete(long checkpointId) {
+		coordinator.checkpointComplete(checkpointId);
+	}
+
+	@Override
+	public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+		// Quiesce the context so the coordinator cannot interact with the job master anymore.
+		quiesceableContext.quiesce();
+		// Close the coordinator.
+		coordinator.close();
+		// Create a new coordinator and reset to the checkpoint.
+		quiesceableContext = new QuiesceableContext(quiesceableContext.getContext());
+		coordinator = provider.getCoordinator(quiesceableContext);
+		coordinator.resetToCheckpoint(checkpointData);
+		// Start the new coordinator if this coordinator has been started before reset to the checkpoint.
+		if (started) {
+			coordinator.start();
+		}
+	}
+
+	// ---------------------
+
+	@VisibleForTesting
+	public OperatorCoordinator getInternalCoordinator() {
+		return coordinator;
+	}
+
+	@VisibleForTesting
+	QuiesceableContext getQuiesceableContext() {
+		return quiesceableContext;
+	}
+
+	// ---------------------
+
+	public static abstract class Provider implements OperatorCoordinator.Provider {
+		private static final long serialVersionUID = 3002837631612629071L;
+		private final OperatorID operatorID;
+
+		public Provider(OperatorID operatorID) {
+			this.operatorID = operatorID;
+		}
+
+		@Override
+		public OperatorID getOperatorId() {
+			return operatorID;
+		}
+
+		@Override
+		public OperatorCoordinator create(Context context) {
+			QuiesceableContext quiesceableContext = new QuiesceableContext(context);
+			return new RecreateOnResetOperatorCoordinator(quiesceableContext, this);
+		}
+
+		protected abstract OperatorCoordinator getCoordinator(OperatorCoordinator.Context context);
+	}
+
+	// ----------------------
+
+	/**
+	 * A wrapper class around the operator coordinator context to allow quiescence.
+	 * When a new operator coordinator is created, we need to quiesce the old
+	 * operator coordinator to prevent it from making any further impact to the
+	 * job master. This is done by quiesce the operator coordinator context.
+	 * After the quiescence, the "reading" methods will still work, but the
+	 * "writing" methods will become a no-op or fail immediately.
+	 */
+	@VisibleForTesting
+	static class QuiesceableContext implements OperatorCoordinator.Context {
+		private final OperatorCoordinator.Context context;
+		private volatile boolean quiesced;
+
+		QuiesceableContext(OperatorCoordinator.Context context) {
+			this.context = context;
+			quiesced = false;
+		}
+
+		@Override
+		public OperatorID getOperatorId() {
+			return context.getOperatorId();
+		}
+
+		@Override
+		public synchronized CompletableFuture<Acknowledge> sendEvent(
+				OperatorEvent evt,
+				int targetSubtask) throws TaskNotRunningException {
+			// Do not enter the sending procedure if the context has been quiesced.
+			if (quiesced) {
+				return CompletableFuture.completedFuture(Acknowledge.get());
+			}
+			return context.sendEvent(evt, targetSubtask);
+		}
+
+		@Override
+		public synchronized void failJob(Throwable cause) {
+			if (quiesced) {
+				return;
+			}
+			context.failJob(cause);
+		}
+
+		@Override
+		public int currentParallelism() {
+			return context.currentParallelism();
+		}
+
+		@VisibleForTesting
+		synchronized void quiesce() {
+			quiesced = true;
+		}
+
+		@VisibleForTesting
+		boolean isQuiesced() {
+			return quiesced;
+		}
+
+		private OperatorCoordinator.Context getContext() {
+			return context;
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
index e730a15..c865df5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
@@ -25,21 +25,24 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 
 /**
  * This class is used to coordinate between two components, where one component has an
  * executor following the mailbox model and the other component notifies it when needed.
  */
-public class ExecutorNotifier {
+public class ExecutorNotifier implements AutoCloseable {
 	private static final Logger LOG = LoggerFactory.getLogger(ExecutorNotifier.class);
-	private ScheduledExecutorService workerExecutor;
-	private Executor executorToNotify;
+	private final ScheduledExecutorService workerExecutor;
+	private final Executor executorToNotify;
+	private final AtomicBoolean closed;
 
 	public ExecutorNotifier(ScheduledExecutorService workerExecutor,
 							Executor executorToNotify) {
 		this.executorToNotify = executorToNotify;
 		this.workerExecutor = workerExecutor;
+		this.closed = new AtomicBoolean(false);
 	}
 
 	/**
@@ -134,4 +137,20 @@ public class ExecutorNotifier {
 			}
 		}, initialDelayMs, periodMs, TimeUnit.MILLISECONDS);
 	}
+
+	/**
+	 * Close the executor notifier. This is a blocking call which waits for all the
+	 * async calls to finish before it returns.
+	 *
+	 * @throws InterruptedException when interrupted during closure.
+	 */
+	public void close() throws InterruptedException {
+		if (!closed.compareAndSet(false, true)) {
+			LOG.debug("The executor notifier has been closed.");
+			return;
+		}
+		// Shutdown the worker executor, so no more worker tasks can run.
+		workerExecutor.shutdownNow();
+		workerExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index dc5f1dd..3aa095d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -113,6 +113,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements
 		boolean successfullyClosed = false;
 		try {
 			if (started) {
+				context.close();
 				enumerator.close();
 			}
 		} finally {
@@ -202,10 +203,6 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements
 					"only be reset to a checkpoint before it starts.", operatorName));
 		}
 		LOG.info("Resetting coordinator of source {} from checkpoint.", operatorName);
-		if (started) {
-			enumerator.close();
-		}
-		LOG.info("Resetting SourceCoordinator from checkpoint.");
 		fromBytes(checkpointData);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 47fe564..322778a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 
 import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readRegisteredReaders;
@@ -77,7 +78,8 @@ import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerde
  * @param <SplitT> the type of the splits.
  */
 @Internal
-public class SourceCoordinatorContext<SplitT extends SourceSplit> implements SplitEnumeratorContext<SplitT> {
+public class SourceCoordinatorContext<SplitT extends SourceSplit>
+		implements SplitEnumeratorContext<SplitT>, AutoCloseable {
 	private final ExecutorService coordinatorExecutor;
 	private final ExecutorNotifier notifier;
 	private final OperatorCoordinator.Context operatorCoordinatorContext;
@@ -192,6 +194,13 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> implements Spl
 		notifier.notifyReadyAsync(callable, handler);
 	}
 
+	@Override
+	public void close() throws InterruptedException {
+		notifier.close();
+		coordinatorExecutor.shutdown();
+		coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+	}
+
 	// --------- Package private additional methods for the SourceCoordinator ------------
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 77ad7a3..8ef3f48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 
 import java.util.concurrent.Callable;
@@ -33,10 +34,9 @@ import java.util.function.BiConsumer;
 /**
  * The provider of {@link SourceCoordinator}.
  */
-public class SourceCoordinatorProvider<SplitT extends SourceSplit>
-		implements OperatorCoordinator.Provider {
+public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends RecreateOnResetOperatorCoordinator.Provider {
+	private static final long serialVersionUID = -1921681440009738462L;
 	private final String operatorName;
-	private final OperatorID operatorID;
 	private final Source<?, SplitT, ?> source;
 	private final int numWorkerThreads;
 
@@ -56,25 +56,21 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit>
 			OperatorID operatorID,
 			Source<?, SplitT, ?> source,
 			int numWorkerThreads) {
+		super(operatorID);
 		this.operatorName = operatorName;
-		this.operatorID = operatorID;
 		this.source = source;
 		this.numWorkerThreads = numWorkerThreads;
 	}
 
 	@Override
-	public OperatorID getOperatorId() {
-		return operatorID;
-	}
-
-	@Override
-	public OperatorCoordinator create(OperatorCoordinator.Context context) {
+	public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
 		final String coordinatorThreadName = "SourceCoordinator-" + operatorName;
 		CoordinatorExecutorThreadFactory coordinatorThreadFactory =
 				new CoordinatorExecutorThreadFactory(coordinatorThreadName);
 		ExecutorService coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
 		SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
-				new SourceCoordinatorContext<>(coordinatorExecutor, coordinatorThreadFactory, numWorkerThreads, context);
+				new SourceCoordinatorContext<>(coordinatorExecutor, coordinatorThreadFactory, numWorkerThreads,
+						context);
 		return new SourceCoordinator<>(operatorName, coordinatorExecutor, source, sourceCoordinatorContext);
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
new file mode 100644
index 0000000..f4298ce
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
@@ -0,0 +1,119 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link RecreateOnResetOperatorCoordinator}.
+ */
+public class RecreateOnResetOperatorCoordinatorTest {
+	private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
+	private static final int NUM_SUBTASKS = 1;
+
+	@Test
+	public void testQuiesceableContextNotQuiesced() throws TaskNotRunningException {
+		MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
+		RecreateOnResetOperatorCoordinator.QuiesceableContext quiesceableContext =
+				new RecreateOnResetOperatorCoordinator.QuiesceableContext(context);
+
+		TestingEvent event = new TestingEvent();
+		quiesceableContext.sendEvent(event, 0);
+		quiesceableContext.failJob(new Exception());
+
+		assertEquals(OPERATOR_ID, quiesceableContext.getOperatorId());
+		assertEquals(NUM_SUBTASKS, quiesceableContext.currentParallelism());
+		assertEquals(Collections.singletonList(event), context.getEventsToOperatorBySubtaskId(0));
+		assertTrue(context.isJobFailed());
+	}
+
+	@Test
+	public void testQuiescedContext() throws TaskNotRunningException {
+		MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
+		RecreateOnResetOperatorCoordinator.QuiesceableContext quiesceableContext =
+				new RecreateOnResetOperatorCoordinator.QuiesceableContext(context);
+
+		quiesceableContext.quiesce();
+		quiesceableContext.sendEvent(new TestingEvent(), 0);
+		quiesceableContext.failJob(new Exception());
+
+		assertEquals(OPERATOR_ID, quiesceableContext.getOperatorId());
+		assertEquals(NUM_SUBTASKS, quiesceableContext.currentParallelism());
+		assertTrue(context.getEventsToOperator().isEmpty());
+		assertFalse(context.isJobFailed());
+	}
+
+	@Test
+	public void testResetToCheckpoint() throws Exception {
+		TestingCoordinatorProvider provider = new TestingCoordinatorProvider();
+		MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
+		RecreateOnResetOperatorCoordinator coordinator = createCoordinator(provider, context);
+
+		RecreateOnResetOperatorCoordinator.QuiesceableContext contextBeforeReset = coordinator.getQuiesceableContext();
+		TestingOperatorCoordinator internalCoordinatorBeforeReset = getInternalCoordinator(coordinator);
+
+		byte[] stateToRestore = new byte[0];
+		coordinator.resetToCheckpoint(stateToRestore);
+
+		assertTrue(contextBeforeReset.isQuiesced());
+		assertNull(internalCoordinatorBeforeReset.getLastRestoredCheckpointState());
+
+		TestingOperatorCoordinator internalCoordinatorAfterReset = getInternalCoordinator(coordinator);
+		assertEquals(stateToRestore, internalCoordinatorAfterReset.getLastRestoredCheckpointState());
+	}
+
+	// ---------------
+
+	private RecreateOnResetOperatorCoordinator createCoordinator(
+			TestingCoordinatorProvider provider,
+			OperatorCoordinator.Context context) {
+		return (RecreateOnResetOperatorCoordinator) provider.create(context);
+	}
+
+	private TestingOperatorCoordinator getInternalCoordinator(RecreateOnResetOperatorCoordinator coordinator) {
+		return (TestingOperatorCoordinator) coordinator.getInternalCoordinator();
+	}
+
+	// ---------------
+
+	private static class TestingCoordinatorProvider extends RecreateOnResetOperatorCoordinator.Provider {
+		private static final long serialVersionUID = 4184184580789587013L;
+
+		public TestingCoordinatorProvider() {
+			super(OPERATOR_ID);
+		}
+
+		@Override
+		protected OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
+			return new TestingOperatorCoordinator(context);
+		}
+	}
+
+	private static class TestingEvent implements OperatorEvent {
+		private static final long serialVersionUID = -3289352911927668275L;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
new file mode 100644
index 0000000..2412a251
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
@@ -0,0 +1,108 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Unit tests for ExecutorNotifier.
+ */
+public class ExecutorNotifierTest {
+	private ScheduledExecutorService workerExecutor;
+	private ExecutorService executorToNotify;
+	private ExecutorNotifier notifier;
+	private Throwable exceptionInHandler;
+
+	@Before
+	public void setup() {
+		this.exceptionInHandler = null;
+		this.workerExecutor = Executors.newSingleThreadScheduledExecutor(r ->
+				new Thread(r, "worker-thread"));
+		this.executorToNotify = Executors.newSingleThreadExecutor(r -> {
+			Thread t = new Thread(r, "main-thread");
+			t.setUncaughtExceptionHandler((thread, e) -> exceptionInHandler = e);
+			return t;
+		});
+		this.notifier = new ExecutorNotifier(
+				this.workerExecutor,
+				this.executorToNotify);
+	}
+
+	@After
+	public void tearDown() throws InterruptedException {
+		notifier.close();
+		closeExecutorToNotify();
+	}
+
+	@Test
+	public void testBasic() throws InterruptedException {
+		CountDownLatch latch = new CountDownLatch(1);
+		AtomicInteger result = new AtomicInteger(0);
+		notifier.notifyReadyAsync(() -> 1234, (v, e) -> {
+			result.set(v);
+			latch.countDown();
+		});
+		latch.await();
+		closeExecutorToNotify();
+		assertEquals(1234, result.get());
+	}
+
+	@Test
+	public void testExceptionInCallable() {
+		Exception exception = new Exception("Expected exception.");
+		notifier.notifyReadyAsync(
+				() -> {
+					throw exception;
+				},
+				(v, e) -> {
+					assertEquals(exception, e);
+					assertNull(v);
+				});
+	}
+
+	@Test
+	public void testExceptionInHandler() throws InterruptedException {
+		CountDownLatch latch = new CountDownLatch(1);
+		RuntimeException exception =  new RuntimeException("Expected exception.");
+		notifier.notifyReadyAsync(() -> 1234, (v, e) -> {
+			latch.countDown();
+			throw exception;
+		});
+		latch.await();
+		closeExecutorToNotify();
+		assertEquals(exception, exceptionInHandler);
+	}
+
+	private void closeExecutorToNotify() throws InterruptedException {
+		executorToNotify.shutdown();
+		executorToNotify.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
new file mode 100644
index 0000000..c86981c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
@@ -0,0 +1,99 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link SourceCoordinatorProvider}.
+ */
+public class SourceCoordinatorProviderTest {
+	private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
+	private static final int NUM_SPLITS = 10;
+	private SourceCoordinatorProvider<MockSourceSplit> provider;
+
+	@Before
+	public void setup() {
+		provider = new SourceCoordinatorProvider<>(
+				"SourceCoordinatorProviderTest",
+				OPERATOR_ID,
+				new MockSource(Boundedness.BOUNDED, NUM_SPLITS),
+				1);
+	}
+
+	@Test
+	public void testCreate() {
+		OperatorCoordinator coordinator =
+				provider.create(new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SPLITS));
+		assertTrue(coordinator instanceof RecreateOnResetOperatorCoordinator);
+	}
+
+	@Test
+	public void testCheckpointAndReset() throws Exception {
+		final OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SPLITS);
+		final RecreateOnResetOperatorCoordinator coordinator =
+				(RecreateOnResetOperatorCoordinator) provider.create(context);
+		final SourceCoordinator<?, ?> sourceCoordinator =
+				(SourceCoordinator<?, ?>) coordinator.getInternalCoordinator();
+
+		// Start the coordinator.
+		coordinator.start();
+		// register reader 0 and take a checkpoint.
+		coordinator.handleEventFromOperator(0, new ReaderRegistrationEvent(0, "location"));
+		CompletableFuture<byte[]> future = new CompletableFuture<>();
+		coordinator.checkpointCoordinator(0L, future);
+		byte[] bytes = future.get();
+
+		// Register reader 1.
+		coordinator.handleEventFromOperator(1, new ReaderRegistrationEvent(1, "location"));
+		// Wait until the coordinator context is updated with registration of reader 1.
+		while (sourceCoordinator.getContext().registeredReaders().size() < 2) {
+			Thread.sleep(1);
+		}
+
+		// reset the coordinator to the checkpoint which only contains reader 0.
+		coordinator.resetToCheckpoint(bytes);
+		final SourceCoordinator<?, ?> restoredSourceCoordinator =
+				(SourceCoordinator<?, ?>) coordinator.getInternalCoordinator();
+		assertNotEquals("The restored source coordinator should be a different instance",
+				restoredSourceCoordinator, sourceCoordinator);
+		assertEquals("There should only be one registered reader.",
+				1, restoredSourceCoordinator.getContext().registeredReaders().size());
+		assertNotNull("The only registered reader should be reader 0",
+				restoredSourceCoordinator.getContext().registeredReaders().get(0));
+	}
+
+}