You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/06/15 16:27:58 UTC

[flink] 01/02: [FLINK-18039][connector/common] Introduce a RecreateOnResetOperatorCoordinator class to recreate an OperatorCoordinator instance when resetToCheckpoint() is invoked. Let SourceCoordinator leverage RecreateOnResetOperatorCoordinator to ensure a clean checkpoint reset.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 24201cc1b6c46c689abbff8635a01cec7b088983
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Sun Jun 7 08:19:41 2020 +0800

    [FLINK-18039][connector/common] Introduce a RecreateOnResetOperatorCoordinator class to recreate an OperatorCoordinator instance when resetToCheckpoint() is invoked. Let SourceCoordinator leverage RecreateOnResetOperatorCoordinator to ensure a clean checkpoint reset.
---
 .../RecreateOnResetOperatorCoordinator.java        | 195 +++++++++++++++++++++
 .../source/coordinator/ExecutorNotifier.java       |  25 ++-
 .../source/coordinator/SourceCoordinator.java      |   5 +-
 .../coordinator/SourceCoordinatorContext.java      |  11 +-
 .../coordinator/SourceCoordinatorProvider.java     |  18 +-
 .../RecreateOnResetOperatorCoordinatorTest.java    | 119 +++++++++++++
 .../source/coordinator/ExecutorNotifierTest.java   | 108 ++++++++++++
 .../coordinator/SourceCoordinatorProviderTest.java |  99 +++++++++++
 8 files changed, 561 insertions(+), 19 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
new file mode 100644
index 0000000..ea91156
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -0,0 +1,195 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A class that will recreate a new {@link OperatorCoordinator} instance when
+ * reset to checkpoint.
+ */
+public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator {
+
+	private final Provider provider;
+	private QuiesceableContext quiesceableContext;
+	private OperatorCoordinator coordinator;
+
+	private boolean started;
+
+	private RecreateOnResetOperatorCoordinator(
+			QuiesceableContext context,
+			Provider provider) {
+		this.quiesceableContext = context;
+		this.provider = provider;
+		this.coordinator = provider.getCoordinator(context);
+		this.started = false;
+	}
+
+	@Override
+	public void start() throws Exception {
+		coordinator.start();
+		started = true;
+	}
+
+	@Override
+	public void close() throws Exception {
+		coordinator.close();
+	}
+
+	@Override
+	public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
+		coordinator.handleEventFromOperator(subtask, event);
+	}
+
+	@Override
+	public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+		coordinator.subtaskFailed(subtask, reason);
+	}
+
+	@Override
+	public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception {
+		coordinator.checkpointCoordinator(checkpointId, resultFuture);
+	}
+
+	@Override
+	public void checkpointComplete(long checkpointId) {
+		coordinator.checkpointComplete(checkpointId);
+	}
+
+	@Override
+	public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+		// Quiesce the context so the coordinator cannot interact with the job master anymore.
+		quiesceableContext.quiesce();
+		// Close the coordinator.
+		coordinator.close();
+		// Create a new coordinator and reset to the checkpoint.
+		quiesceableContext = new QuiesceableContext(quiesceableContext.getContext());
+		coordinator = provider.getCoordinator(quiesceableContext);
+		coordinator.resetToCheckpoint(checkpointData);
+		// Start the new coordinator if this coordinator has been started before reset to the checkpoint.
+		if (started) {
+			coordinator.start();
+		}
+	}
+
+	// ---------------------
+
+	@VisibleForTesting
+	public OperatorCoordinator getInternalCoordinator() {
+		return coordinator;
+	}
+
+	@VisibleForTesting
+	QuiesceableContext getQuiesceableContext() {
+		return quiesceableContext;
+	}
+
+	// ---------------------
+
+	public static abstract class Provider implements OperatorCoordinator.Provider {
+		private static final long serialVersionUID = 3002837631612629071L;
+		private final OperatorID operatorID;
+
+		public Provider(OperatorID operatorID) {
+			this.operatorID = operatorID;
+		}
+
+		@Override
+		public OperatorID getOperatorId() {
+			return operatorID;
+		}
+
+		@Override
+		public OperatorCoordinator create(Context context) {
+			QuiesceableContext quiesceableContext = new QuiesceableContext(context);
+			return new RecreateOnResetOperatorCoordinator(quiesceableContext, this);
+		}
+
+		protected abstract OperatorCoordinator getCoordinator(OperatorCoordinator.Context context);
+	}
+
+	// ----------------------
+
+	/**
+	 * A wrapper class around the operator coordinator context to allow quiescence.
+	 * When a new operator coordinator is created, we need to quiesce the old
+	 * operator coordinator to prevent it from making any further impact to the
+	 * job master. This is done by quiesce the operator coordinator context.
+	 * After the quiescence, the "reading" methods will still work, but the
+	 * "writing" methods will become a no-op or fail immediately.
+	 */
+	@VisibleForTesting
+	static class QuiesceableContext implements OperatorCoordinator.Context {
+		private final OperatorCoordinator.Context context;
+		private volatile boolean quiesced;
+
+		QuiesceableContext(OperatorCoordinator.Context context) {
+			this.context = context;
+			quiesced = false;
+		}
+
+		@Override
+		public OperatorID getOperatorId() {
+			return context.getOperatorId();
+		}
+
+		@Override
+		public synchronized CompletableFuture<Acknowledge> sendEvent(
+				OperatorEvent evt,
+				int targetSubtask) throws TaskNotRunningException {
+			// Do not enter the sending procedure if the context has been quiesced.
+			if (quiesced) {
+				return CompletableFuture.completedFuture(Acknowledge.get());
+			}
+			return context.sendEvent(evt, targetSubtask);
+		}
+
+		@Override
+		public synchronized void failJob(Throwable cause) {
+			if (quiesced) {
+				return;
+			}
+			context.failJob(cause);
+		}
+
+		@Override
+		public int currentParallelism() {
+			return context.currentParallelism();
+		}
+
+		@VisibleForTesting
+		synchronized void quiesce() {
+			quiesced = true;
+		}
+
+		@VisibleForTesting
+		boolean isQuiesced() {
+			return quiesced;
+		}
+
+		private OperatorCoordinator.Context getContext() {
+			return context;
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
index e730a15..c865df5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
@@ -25,21 +25,24 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 
 /**
  * This class is used to coordinate between two components, where one component has an
  * executor following the mailbox model and the other component notifies it when needed.
  */
-public class ExecutorNotifier {
+public class ExecutorNotifier implements AutoCloseable {
 	private static final Logger LOG = LoggerFactory.getLogger(ExecutorNotifier.class);
-	private ScheduledExecutorService workerExecutor;
-	private Executor executorToNotify;
+	private final ScheduledExecutorService workerExecutor;
+	private final Executor executorToNotify;
+	private final AtomicBoolean closed;
 
 	public ExecutorNotifier(ScheduledExecutorService workerExecutor,
 							Executor executorToNotify) {
 		this.executorToNotify = executorToNotify;
 		this.workerExecutor = workerExecutor;
+		this.closed = new AtomicBoolean(false);
 	}
 
 	/**
@@ -134,4 +137,20 @@ public class ExecutorNotifier {
 			}
 		}, initialDelayMs, periodMs, TimeUnit.MILLISECONDS);
 	}
+
+	/**
+	 * Close the executor notifier. This is a blocking call which waits for all the
+	 * async calls to finish before it returns.
+	 *
+	 * @throws InterruptedException when interrupted during closure.
+	 */
+	public void close() throws InterruptedException {
+		if (!closed.compareAndSet(false, true)) {
+			LOG.debug("The executor notifier has been closed.");
+			return;
+		}
+		// Shutdown the worker executor, so no more worker tasks can run.
+		workerExecutor.shutdownNow();
+		workerExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index dc5f1dd..3aa095d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -113,6 +113,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements
 		boolean successfullyClosed = false;
 		try {
 			if (started) {
+				context.close();
 				enumerator.close();
 			}
 		} finally {
@@ -202,10 +203,6 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements
 					"only be reset to a checkpoint before it starts.", operatorName));
 		}
 		LOG.info("Resetting coordinator of source {} from checkpoint.", operatorName);
-		if (started) {
-			enumerator.close();
-		}
-		LOG.info("Resetting SourceCoordinator from checkpoint.");
 		fromBytes(checkpointData);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 47fe564..322778a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 
 import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readRegisteredReaders;
@@ -77,7 +78,8 @@ import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerde
  * @param <SplitT> the type of the splits.
  */
 @Internal
-public class SourceCoordinatorContext<SplitT extends SourceSplit> implements SplitEnumeratorContext<SplitT> {
+public class SourceCoordinatorContext<SplitT extends SourceSplit>
+		implements SplitEnumeratorContext<SplitT>, AutoCloseable {
 	private final ExecutorService coordinatorExecutor;
 	private final ExecutorNotifier notifier;
 	private final OperatorCoordinator.Context operatorCoordinatorContext;
@@ -192,6 +194,13 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> implements Spl
 		notifier.notifyReadyAsync(callable, handler);
 	}
 
+	@Override
+	public void close() throws InterruptedException {
+		notifier.close();
+		coordinatorExecutor.shutdown();
+		coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+	}
+
 	// --------- Package private additional methods for the SourceCoordinator ------------
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 77ad7a3..8ef3f48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 
 import java.util.concurrent.Callable;
@@ -33,10 +34,9 @@ import java.util.function.BiConsumer;
 /**
  * The provider of {@link SourceCoordinator}.
  */
-public class SourceCoordinatorProvider<SplitT extends SourceSplit>
-		implements OperatorCoordinator.Provider {
+public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends RecreateOnResetOperatorCoordinator.Provider {
+	private static final long serialVersionUID = -1921681440009738462L;
 	private final String operatorName;
-	private final OperatorID operatorID;
 	private final Source<?, SplitT, ?> source;
 	private final int numWorkerThreads;
 
@@ -56,25 +56,21 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit>
 			OperatorID operatorID,
 			Source<?, SplitT, ?> source,
 			int numWorkerThreads) {
+		super(operatorID);
 		this.operatorName = operatorName;
-		this.operatorID = operatorID;
 		this.source = source;
 		this.numWorkerThreads = numWorkerThreads;
 	}
 
 	@Override
-	public OperatorID getOperatorId() {
-		return operatorID;
-	}
-
-	@Override
-	public OperatorCoordinator create(OperatorCoordinator.Context context) {
+	public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
 		final String coordinatorThreadName = "SourceCoordinator-" + operatorName;
 		CoordinatorExecutorThreadFactory coordinatorThreadFactory =
 				new CoordinatorExecutorThreadFactory(coordinatorThreadName);
 		ExecutorService coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
 		SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
-				new SourceCoordinatorContext<>(coordinatorExecutor, coordinatorThreadFactory, numWorkerThreads, context);
+				new SourceCoordinatorContext<>(coordinatorExecutor, coordinatorThreadFactory, numWorkerThreads,
+						context);
 		return new SourceCoordinator<>(operatorName, coordinatorExecutor, source, sourceCoordinatorContext);
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
new file mode 100644
index 0000000..f4298ce
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
@@ -0,0 +1,119 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link RecreateOnResetOperatorCoordinator}.
+ */
+public class RecreateOnResetOperatorCoordinatorTest {
+	private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
+	private static final int NUM_SUBTASKS = 1;
+
+	@Test
+	public void testQuiesceableContextNotQuiesced() throws TaskNotRunningException {
+		MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
+		RecreateOnResetOperatorCoordinator.QuiesceableContext quiesceableContext =
+				new RecreateOnResetOperatorCoordinator.QuiesceableContext(context);
+
+		TestingEvent event = new TestingEvent();
+		quiesceableContext.sendEvent(event, 0);
+		quiesceableContext.failJob(new Exception());
+
+		assertEquals(OPERATOR_ID, quiesceableContext.getOperatorId());
+		assertEquals(NUM_SUBTASKS, quiesceableContext.currentParallelism());
+		assertEquals(Collections.singletonList(event), context.getEventsToOperatorBySubtaskId(0));
+		assertTrue(context.isJobFailed());
+	}
+
+	@Test
+	public void testQuiescedContext() throws TaskNotRunningException {
+		MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
+		RecreateOnResetOperatorCoordinator.QuiesceableContext quiesceableContext =
+				new RecreateOnResetOperatorCoordinator.QuiesceableContext(context);
+
+		quiesceableContext.quiesce();
+		quiesceableContext.sendEvent(new TestingEvent(), 0);
+		quiesceableContext.failJob(new Exception());
+
+		assertEquals(OPERATOR_ID, quiesceableContext.getOperatorId());
+		assertEquals(NUM_SUBTASKS, quiesceableContext.currentParallelism());
+		assertTrue(context.getEventsToOperator().isEmpty());
+		assertFalse(context.isJobFailed());
+	}
+
+	@Test
+	public void testResetToCheckpoint() throws Exception {
+		TestingCoordinatorProvider provider = new TestingCoordinatorProvider();
+		MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
+		RecreateOnResetOperatorCoordinator coordinator = createCoordinator(provider, context);
+
+		RecreateOnResetOperatorCoordinator.QuiesceableContext contextBeforeReset = coordinator.getQuiesceableContext();
+		TestingOperatorCoordinator internalCoordinatorBeforeReset = getInternalCoordinator(coordinator);
+
+		byte[] stateToRestore = new byte[0];
+		coordinator.resetToCheckpoint(stateToRestore);
+
+		assertTrue(contextBeforeReset.isQuiesced());
+		assertNull(internalCoordinatorBeforeReset.getLastRestoredCheckpointState());
+
+		TestingOperatorCoordinator internalCoordinatorAfterReset = getInternalCoordinator(coordinator);
+		assertEquals(stateToRestore, internalCoordinatorAfterReset.getLastRestoredCheckpointState());
+	}
+
+	// ---------------
+
+	private RecreateOnResetOperatorCoordinator createCoordinator(
+			TestingCoordinatorProvider provider,
+			OperatorCoordinator.Context context) {
+		return (RecreateOnResetOperatorCoordinator) provider.create(context);
+	}
+
+	private TestingOperatorCoordinator getInternalCoordinator(RecreateOnResetOperatorCoordinator coordinator) {
+		return (TestingOperatorCoordinator) coordinator.getInternalCoordinator();
+	}
+
+	// ---------------
+
+	private static class TestingCoordinatorProvider extends RecreateOnResetOperatorCoordinator.Provider {
+		private static final long serialVersionUID = 4184184580789587013L;
+
+		public TestingCoordinatorProvider() {
+			super(OPERATOR_ID);
+		}
+
+		@Override
+		protected OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
+			return new TestingOperatorCoordinator(context);
+		}
+	}
+
+	private static class TestingEvent implements OperatorEvent {
+		private static final long serialVersionUID = -3289352911927668275L;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
new file mode 100644
index 0000000..2412a251
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
@@ -0,0 +1,108 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Unit tests for ExecutorNotifier.
+ */
+public class ExecutorNotifierTest {
+	private ScheduledExecutorService workerExecutor;
+	private ExecutorService executorToNotify;
+	private ExecutorNotifier notifier;
+	private Throwable exceptionInHandler;
+
+	@Before
+	public void setup() {
+		this.exceptionInHandler = null;
+		this.workerExecutor = Executors.newSingleThreadScheduledExecutor(r ->
+				new Thread(r, "worker-thread"));
+		this.executorToNotify = Executors.newSingleThreadExecutor(r -> {
+			Thread t = new Thread(r, "main-thread");
+			t.setUncaughtExceptionHandler((thread, e) -> exceptionInHandler = e);
+			return t;
+		});
+		this.notifier = new ExecutorNotifier(
+				this.workerExecutor,
+				this.executorToNotify);
+	}
+
+	@After
+	public void tearDown() throws InterruptedException {
+		notifier.close();
+		closeExecutorToNotify();
+	}
+
+	@Test
+	public void testBasic() throws InterruptedException {
+		CountDownLatch latch = new CountDownLatch(1);
+		AtomicInteger result = new AtomicInteger(0);
+		notifier.notifyReadyAsync(() -> 1234, (v, e) -> {
+			result.set(v);
+			latch.countDown();
+		});
+		latch.await();
+		closeExecutorToNotify();
+		assertEquals(1234, result.get());
+	}
+
+	@Test
+	public void testExceptionInCallable() {
+		Exception exception = new Exception("Expected exception.");
+		notifier.notifyReadyAsync(
+				() -> {
+					throw exception;
+				},
+				(v, e) -> {
+					assertEquals(exception, e);
+					assertNull(v);
+				});
+	}
+
+	@Test
+	public void testExceptionInHandler() throws InterruptedException {
+		CountDownLatch latch = new CountDownLatch(1);
+		RuntimeException exception =  new RuntimeException("Expected exception.");
+		notifier.notifyReadyAsync(() -> 1234, (v, e) -> {
+			latch.countDown();
+			throw exception;
+		});
+		latch.await();
+		closeExecutorToNotify();
+		assertEquals(exception, exceptionInHandler);
+	}
+
+	private void closeExecutorToNotify() throws InterruptedException {
+		executorToNotify.shutdown();
+		executorToNotify.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
new file mode 100644
index 0000000..c86981c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
@@ -0,0 +1,99 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link SourceCoordinatorProvider}.
+ */
+public class SourceCoordinatorProviderTest {
+	private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
+	private static final int NUM_SPLITS = 10;
+	private SourceCoordinatorProvider<MockSourceSplit> provider;
+
+	@Before
+	public void setup() {
+		provider = new SourceCoordinatorProvider<>(
+				"SourceCoordinatorProviderTest",
+				OPERATOR_ID,
+				new MockSource(Boundedness.BOUNDED, NUM_SPLITS),
+				1);
+	}
+
+	@Test
+	public void testCreate() {
+		OperatorCoordinator coordinator =
+				provider.create(new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SPLITS));
+		assertTrue(coordinator instanceof RecreateOnResetOperatorCoordinator);
+	}
+
+	@Test
+	public void testCheckpointAndReset() throws Exception {
+		final OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SPLITS);
+		final RecreateOnResetOperatorCoordinator coordinator =
+				(RecreateOnResetOperatorCoordinator) provider.create(context);
+		final SourceCoordinator<?, ?> sourceCoordinator =
+				(SourceCoordinator<?, ?>) coordinator.getInternalCoordinator();
+
+		// Start the coordinator.
+		coordinator.start();
+		// register reader 0 and take a checkpoint.
+		coordinator.handleEventFromOperator(0, new ReaderRegistrationEvent(0, "location"));
+		CompletableFuture<byte[]> future = new CompletableFuture<>();
+		coordinator.checkpointCoordinator(0L, future);
+		byte[] bytes = future.get();
+
+		// Register reader 1.
+		coordinator.handleEventFromOperator(1, new ReaderRegistrationEvent(1, "location"));
+		// Wait until the coordinator context is updated with registration of reader 1.
+		while (sourceCoordinator.getContext().registeredReaders().size() < 2) {
+			Thread.sleep(1);
+		}
+
+		// reset the coordinator to the checkpoint which only contains reader 0.
+		coordinator.resetToCheckpoint(bytes);
+		final SourceCoordinator<?, ?> restoredSourceCoordinator =
+				(SourceCoordinator<?, ?>) coordinator.getInternalCoordinator();
+		assertNotEquals("The restored source coordinator should be a different instance",
+				restoredSourceCoordinator, sourceCoordinator);
+		assertEquals("There should only be one registered reader.",
+				1, restoredSourceCoordinator.getContext().registeredReaders().size());
+		assertNotNull("The only registered reader should be reader 0",
+				restoredSourceCoordinator.getContext().registeredReaders().get(0));
+	}
+
+}