You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/05/05 15:20:04 UTC

[flink] branch master updated: [FLINK-15101][connector/common] Add the SourceCoordinator implementation

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ed40049  [FLINK-15101][connector/common] Add the SourceCoordinator implementation
ed40049 is described below

commit ed400497e56ea272722ac71697edf830b2d682ae
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Mar 23 08:53:31 2020 +0800

    [FLINK-15101][connector/common] Add the SourceCoordinator implementation
---
 .../flink/api/connector/source/ReaderInfo.java     |  15 ++
 .../api/connector/source/SplitEnumerator.java      |   3 +-
 .../connector/source/SplitEnumeratorContext.java   |  26 +-
 .../api/connector/source/mocks/MockSource.java     |  75 ++++++
 .../connector/source/mocks/MockSourceSplit.java    |   2 +-
 .../source/mocks/MockSplitEnumerator.java          | 139 +++++++++++
 .../MockSplitEnumeratorCheckpointSerializer.java   |  55 ++++
 .../source/coordinator/ExecutorNotifier.java       | 137 ++++++++++
 .../source/coordinator/SourceCoordinator.java      | 278 +++++++++++++++++++++
 .../coordinator/SourceCoordinatorContext.java      | 274 ++++++++++++++++++++
 .../coordinator/SourceCoordinatorProvider.java     | 112 +++++++++
 .../coordinator/SourceCoordinatorSerdeUtils.java   | 179 +++++++++++++
 .../source/coordinator/SplitAssignmentTracker.java | 164 ++++++++++++
 .../flink/runtime/source/event/AddSplitEvent.java  |  38 ++-
 .../source/event/ReaderRegistrationEvent.java      |  29 +--
 .../runtime/source/event/SourceEventWrapper.java   |  36 ++-
 .../MockOperatorCoordinatorContext.java            | 105 ++++++++
 .../source/coordinator/CoordinatorTestUtils.java   |  84 +++++++
 .../coordinator/SourceCoordinatorContextTest.java  | 188 ++++++++++++++
 .../source/coordinator/SourceCoordinatorTest.java  | 214 ++++++++++++++++
 .../coordinator/SourceCoordinatorTestBase.java     |  86 +++++++
 .../coordinator/SplitAssignmentTrackerTest.java    | 173 +++++++++++++
 22 files changed, 2341 insertions(+), 71 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
index f899b12..4a048d1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.connector.source;
 import org.apache.flink.annotation.Public;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * A container class hosting the information of a {@link SourceReader}.
@@ -48,4 +49,18 @@ public final class ReaderInfo implements Serializable {
 	public String getLocation() {
 		return location;
 	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(subtaskId, location);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (!(obj instanceof ReaderInfo)) {
+			return false;
+		}
+		ReaderInfo other = (ReaderInfo) obj;
+		return subtaskId == other.subtaskId && location.equals(other.location);
+	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
index bdaee36..16b3938 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
@@ -66,8 +66,9 @@ public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extend
 	 * Checkpoints the state of this split enumerator.
 	 *
 	 * @return an object containing the state of the split enumerator.
+	 * @throws Exception when the snapshot cannot be taken.
 	 */
-	CheckpointT snapshotState();
+	CheckpointT snapshotState() throws Exception;
 
 	/**
 	 * Called to close the enumerator, in case it holds on to any resources, like threads or
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
index 0dd004a..db6ccad 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
@@ -48,11 +48,13 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> {
 	void sendEventToSourceReader(int subtaskId, SourceEvent event);
 
 	/**
-	 * Get the number of subtasks.
+	 * Get the current parallelism of this Source. Note that due to auto-scaling, the parallelism
+	 * may change over time. Therefore the SplitEnumerator should not cache the return value
+	 * of this method, but always invoke this method to get the latest parallelism.
 	 *
-	 * @return the number of subtasks.
+	 * @return the parallelism of the Source.
 	 */
-	int numSubtasks();
+	int currentParallelism();
 
 	/**
 	 * Get the currently registered readers. The mapping is from subtask id to the reader info.
@@ -70,10 +72,12 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> {
 
 	/**
 	 * Invoke the callable and handover the return value to the handler which will be executed
-	 * by the source coordinator.
+	 * by the source coordinator. When this method is invoked multiple times, The <code>Coallble</code>s
+	 * may be executed in a thread pool concurrently.
 	 *
-	 * <p>It is important to make sure that the callable should not modify
-	 * any shared state. Otherwise the there might be unexpected behavior.
+	 * <p>It is important to make sure that the callable does not modify any shared state, especially
+	 * the states that will be a part of the {@link SplitEnumerator#snapshotState()}. Otherwise the
+	 * there might be unexpected behavior.
 	 *
 	 * @param callable a callable to call.
 	 * @param handler a handler that handles the return value of or the exception thrown from the callable.
@@ -81,11 +85,13 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> {
 	<T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler);
 
 	/**
-	 * Invoke the callable and handover the return value to the handler which will be executed
-	 * by the source coordinator.
+	 * Invoke the given callable periodically and handover the return value to the handler which will
+	 * be executed by the source coordinator. When this method is invoked multiple times, The
+	 * <code>Coallble</code>s may be executed in a thread pool concurrently.
 	 *
-	 * <p>It is important to make sure that the callable should not modify
-	 * any shared state. Otherwise the there might be unexpected behavior.
+	 * <p>It is important to make sure that the callable does not modify any shared state, especially
+	 * the states that will be a part of the {@link SplitEnumerator#snapshotState()}. Otherwise the
+	 * there might be unexpected behavior.
 	 *
 	 * @param callable the callable to call.
 	 * @param handler a handler that handles the return value of or the exception thrown from the callable.
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
new file mode 100644
index 0000000..f38ca60
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
@@ -0,0 +1,75 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.mocks;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Set;
+
+/**
+ * A mock {@link Source} for unit tests.
+ */
+public class MockSource implements Source<Integer, MockSourceSplit, Set<MockSourceSplit>> {
+	private final Boundedness boundedness;
+	private final int numSplits;
+
+	public MockSource(Boundedness boundedness, int numSplits) {
+		this.boundedness = boundedness;
+		this.numSplits = numSplits;
+	}
+
+	@Override
+	public Boundedness getBoundedness() {
+		return boundedness;
+	}
+
+	@Override
+	public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> createEnumerator(SplitEnumeratorContext<MockSourceSplit> enumContext) {
+		return new MockSplitEnumerator(numSplits, enumContext);
+	}
+
+	@Override
+	public SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> restoreEnumerator(
+			SplitEnumeratorContext<MockSourceSplit> enumContext,
+			Set<MockSourceSplit> checkpoint) throws IOException {
+		return new MockSplitEnumerator(checkpoint, enumContext);
+	}
+
+	@Override
+	public SimpleVersionedSerializer<MockSourceSplit> getSplitSerializer() {
+		return new MockSourceSplitSerializer();
+	}
+
+	@Override
+	public SimpleVersionedSerializer<Set<MockSourceSplit>> getEnumeratorCheckpointSerializer() {
+		return new MockSplitEnumeratorCheckpointSerializer();
+	}
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
index 08a7ed8..9dbc9de 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
@@ -90,7 +90,7 @@ public class MockSourceSplit implements SourceSplit, Serializable {
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(id, records.toArray(new Integer[0]), endIndex, index);
+		return Objects.hash(id, Arrays.hashCode(records.toArray(new Integer[0])), endIndex, index);
 	}
 
 	@Override
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
new file mode 100644
index 0000000..64ec8af
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
@@ -0,0 +1,139 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.mocks;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * A mock {@link SplitEnumerator} for unit tests.
+ */
+public class MockSplitEnumerator implements SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> {
+	private SortedSet<MockSourceSplit> unassignedSplits;
+	private SplitEnumeratorContext<MockSourceSplit> enumContext;
+	private List<SourceEvent> handledSourceEvent;
+	private boolean started;
+	private boolean closed;
+
+	public MockSplitEnumerator(int numSplits, SplitEnumeratorContext<MockSourceSplit> enumContext) {
+		this(new HashSet<>(), enumContext);
+		for (int i = 0; i < numSplits; i++) {
+			unassignedSplits.add(new MockSourceSplit(i));
+		}
+	}
+
+	public MockSplitEnumerator(
+			Set<MockSourceSplit> unassignedSplits,
+			SplitEnumeratorContext<MockSourceSplit> enumContext) {
+		this.unassignedSplits = new TreeSet<>(Comparator.comparingInt(o -> Integer.parseInt(o.splitId())));
+		this.unassignedSplits.addAll(unassignedSplits);
+		this.enumContext = enumContext;
+		this.handledSourceEvent = new ArrayList<>();
+		this.started = false;
+		this.closed = false;
+	}
+
+	@Override
+	public void start() {
+		this.started = true;
+	}
+
+	@Override
+	public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+		handledSourceEvent.add(sourceEvent);
+	}
+
+	@Override
+	public void addSplitsBack(List<MockSourceSplit> splits, int subtaskId) {
+		unassignedSplits.addAll(splits);
+	}
+
+	@Override
+	public void addReader(int subtaskId) {
+		List<MockSourceSplit> assignment = new ArrayList<>();
+		for (MockSourceSplit split : unassignedSplits) {
+			if (Integer.parseInt(split.splitId()) % enumContext.currentParallelism() == subtaskId) {
+				assignment.add(split);
+			}
+		}
+		enumContext.assignSplits(new SplitsAssignment<>(Collections.singletonMap(subtaskId, assignment)));
+		unassignedSplits.removeAll(assignment);
+	}
+
+	@Override
+	public Set<MockSourceSplit> snapshotState() {
+		return unassignedSplits;
+	}
+
+	@Override
+	public void close() throws IOException {
+		this.closed = true;
+	}
+
+	public void addNewSplits(List<MockSourceSplit> newSplits) {
+		unassignedSplits.addAll(newSplits);
+		assignAllSplits();
+	}
+
+	// --------------------
+
+	public boolean started() {
+		return started;
+	}
+
+	public boolean closed() {
+		return closed;
+	}
+
+	public Set<MockSourceSplit> getUnassignedSplits() {
+		return unassignedSplits;
+	}
+
+	public List<SourceEvent> getHandledSourceEvent() {
+		return handledSourceEvent;
+	}
+
+	// --------------------
+
+	private void assignAllSplits() {
+		Map<Integer, List<MockSourceSplit>> assignment = new HashMap<>();
+		unassignedSplits.forEach(split -> {
+			int subtaskId = Integer.parseInt(split.splitId()) % enumContext.currentParallelism();
+			if (enumContext.registeredReaders().containsKey(subtaskId)) {
+				assignment.computeIfAbsent(subtaskId, ignored -> new ArrayList<>()).add(split);
+			}
+		});
+		enumContext.assignSplits(new SplitsAssignment<>(assignment));
+		assignment.values().forEach(l -> unassignedSplits.removeAll(l));
+	}
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorCheckpointSerializer.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorCheckpointSerializer.java
new file mode 100644
index 0000000..4879b18
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorCheckpointSerializer.java
@@ -0,0 +1,55 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.mocks;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Serializer for the checkpoint of {@link MockSplitEnumerator}.
+ */
+public class MockSplitEnumeratorCheckpointSerializer implements SimpleVersionedSerializer<Set<MockSourceSplit>> {
+
+	@Override
+	public int getVersion() {
+		return 0;
+	}
+
+	@Override
+	public byte[] serialize(Set<MockSourceSplit> obj) throws IOException {
+		return InstantiationUtil.serializeObject(new ArrayList<>(obj));
+	}
+
+	@Override
+	public Set<MockSourceSplit> deserialize(int version, byte[] serialized) throws IOException {
+		try {
+			ArrayList<MockSourceSplit> list = InstantiationUtil.deserializeObject(serialized, getClass().getClassLoader());
+			return new HashSet<>(list);
+		} catch (ClassNotFoundException e) {
+			throw new FlinkRuntimeException("Failed to deserialize the enumerator checkpoint.");
+		}
+	}
+
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
new file mode 100644
index 0000000..e730a15
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
@@ -0,0 +1,137 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+/**
+ * This class is used to coordinate between two components, where one component has an
+ * executor following the mailbox model and the other component notifies it when needed.
+ */
+public class ExecutorNotifier {
+	private static final Logger LOG = LoggerFactory.getLogger(ExecutorNotifier.class);
+	private ScheduledExecutorService workerExecutor;
+	private Executor executorToNotify;
+
+	public ExecutorNotifier(ScheduledExecutorService workerExecutor,
+							Executor executorToNotify) {
+		this.executorToNotify = executorToNotify;
+		this.workerExecutor = workerExecutor;
+	}
+
+	/**
+	 * Call the given callable once. Notify the {@link #executorToNotify} to execute
+	 * the handler.
+	 *
+	 * <p>Note that when this method is invoked multiple times, it is possible that
+	 * multiple callables are executed concurrently, so do the handlers. For example,
+	 * assuming both the workerExecutor and executorToNotify are single threaded.
+	 * The following code may still throw a <code>ConcurrentModificationException</code>.
+	 *
+	 * <pre>{@code
+	 *  final List<Integer> list = new ArrayList<>();
+	 *
+	 *  // The callable adds an integer 1 to the list, while it works at the first glance,
+	 *  // A ConcurrentModificationException may be thrown because the caller and
+	 *  // handler may modify the list at the same time.
+	 *  notifier.notifyReadyAsync(
+	 *  	() -> list.add(1),
+	 *  	(ignoredValue, ignoredThrowable) -> list.add(2));
+	 * }</pre>
+	 *
+	 * <p>Instead, the above logic should be implemented in as:
+	 * <pre>{@code
+	 *  // Modify the state in the handler.
+	 *  notifier.notifyReadyAsync(() -> 1, (v, ignoredThrowable) -> {
+	 *  	list.add(v));
+	 *  	list.add(2);
+	 *  });
+	 * }</pre>
+	 *
+	 * @param callable the callable to invoke before notifying the executor.
+	 * @param handler the handler to handle the result of the callable.
+	 */
+	public <T> void notifyReadyAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) {
+		workerExecutor.execute(() -> {
+			try {
+				T result = callable.call();
+				executorToNotify.execute(() -> handler.accept(result, null));
+			} catch (Throwable t) {
+				LOG.error("Unexpected exception {}", t);
+				handler.accept(null, t);
+			}
+		});
+	}
+
+	/**
+	 * Call the given callable once. Notify the {@link #executorToNotify} to execute
+	 * the handler.
+	 *
+	 * <p>Note that when this method is invoked multiple times, it is possible that
+	 * multiple callables are executed concurrently, so do the handlers. For example,
+	 * assuming both the workerExecutor and executorToNotify are single threaded.
+	 * The following code may still throw a <code>ConcurrentModificationException</code>.
+	 *
+	 * <pre>{@code
+	 *  final List<Integer> list = new ArrayList<>();
+	 *
+	 *  // The callable adds an integer 1 to the list, while it works at the first glance,
+	 *  // A ConcurrentModificationException may be thrown because the caller and
+	 *  // handler may modify the list at the same time.
+	 *  notifier.notifyReadyAsync(
+	 *  	() -> list.add(1),
+	 *  	(ignoredValue, ignoredThrowable) -> list.add(2));
+	 * }</pre>
+	 *
+	 * <p>Instead, the above logic should be implemented in as:
+	 * <pre>{@code
+	 *  // Modify the state in the handler.
+	 *  notifier.notifyReadyAsync(() -> 1, (v, ignoredThrowable) -> {
+	 *  	list.add(v));
+	 *  	list.add(2);
+	 *  });
+	 * }</pre>
+	 *
+	 * @param callable the callable to execute before notifying the executor to notify.
+	 * @param handler the handler that handles the result from the callable.
+	 * @param initialDelayMs the initial delay in ms before invoking the given callable.
+	 * @param periodMs the interval in ms to invoke the callable.
+	 */
+	public <T> void notifyReadyAsync(
+			Callable<T> callable,
+			BiConsumer<T, Throwable> handler,
+			long initialDelayMs,
+			long periodMs) {
+		workerExecutor.scheduleAtFixedRate(() -> {
+			try {
+				T result = callable.call();
+				executorToNotify.execute(() -> handler.accept(result, null));
+			} catch (Throwable t) {
+				handler.accept(null, t);
+			}
+		}, initialDelayMs, periodMs, TimeUnit.MILLISECONDS);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
new file mode 100644
index 0000000..4fc2aeb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -0,0 +1,278 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readAndVerifyCoordinatorSerdeVersion;
+import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readBytes;
+import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.writeCoordinatorSerdeVersion;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the {@link Source}.
+ *
+ * <p>The <code>SourceCoordinator</code> provides an event loop style thread model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * <p>The coordinator maintains a {@link org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements OperatorCoordinator {
+	private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinator.class);
+	/** The name of the operator this SourceCoordinator is associated with. */
+	private final String operatorName;
+	/** A single-thread executor to handle all the changes to the coordinator. */
+	private final ExecutorService coordinatorExecutor;
+	/** The Source that is associated with this SourceCoordinator. */
+	private final Source<?, SplitT, EnumChkT> source;
+	/** The serializer that handles the serde of the SplitEnumerator checkpoints. */
+	private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
+	/** The serializer for the SourceSplit of the associated Source. */
+	private final SimpleVersionedSerializer<SplitT> splitSerializer;
+	/** The context containing the states of the coordinator. */
+	private final SourceCoordinatorContext<SplitT> context;
+	/** The split enumerator created from the associated Source. */
+	private SplitEnumerator<SplitT, EnumChkT> enumerator;
+	/** A flag marking whether the coordinator has started. */
+	private boolean started;
+
+	public SourceCoordinator(
+			String operatorName,
+			ExecutorService coordinatorExecutor,
+			Source<?, SplitT, EnumChkT> source,
+			SourceCoordinatorContext<SplitT> context) {
+		this.operatorName = operatorName;
+		this.coordinatorExecutor = coordinatorExecutor;
+		this.source = source;
+		this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
+		this.splitSerializer = source.getSplitSerializer();
+		this.context = context;
+		this.enumerator = source.createEnumerator(context);
+		this.started = false;
+	}
+
+	@Override
+	public void start() throws Exception {
+		LOG.info("Starting split enumerator for source {}.", operatorName);
+		enumerator.start();
+		started = true;
+	}
+
+	@Override
+	public void close() throws Exception {
+		LOG.info("Closing SourceCoordinator for source {}.", operatorName);
+		boolean successfullyClosed = false;
+		try {
+			if (started) {
+				enumerator.close();
+			}
+		} finally {
+			coordinatorExecutor.shutdownNow();
+			// We do not expect this to actually block for long. At this point, there should be very few task running
+			// in the executor, if any.
+			successfullyClosed = coordinatorExecutor.awaitTermination(10, TimeUnit.SECONDS);
+		}
+		if (!successfullyClosed) {
+			throw new TimeoutException("The source coordinator failed to close before timeout.");
+		}
+		LOG.info("Source coordinator for source {} closed.", operatorName);
+	}
+
+	@Override
+	public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
+		ensureStarted();
+		coordinatorExecutor.execute(() -> {
+			try {
+				LOG.debug("Handling event from subtask {} of source {}: {}", subtask, operatorName, event);
+				if (event instanceof SourceEventWrapper) {
+					enumerator.handleSourceEvent(subtask, ((SourceEventWrapper) event).getSourceEvent());
+				} else if (event instanceof ReaderRegistrationEvent) {
+					handleReaderRegistrationEvent((ReaderRegistrationEvent) event);
+				}
+			} catch (Exception e) {
+				LOG.error("Failing the job due to exception when handling operator event {} from subtask {} " +
+								"of source {}.", event, subtask, operatorName, e);
+				context.failJob(e);
+			}
+		});
+	}
+
+	@Override
+	public void subtaskFailed(int subtaskId) {
+		ensureStarted();
+		coordinatorExecutor.execute(() -> {
+			try {
+				LOG.info("Handling subtask {} failure of source {}.", subtaskId, operatorName);
+				List<SplitT> splitsToAddBack = context.getAndRemoveUncheckpointedAssignment(subtaskId);
+				context.unregisterSourceReader(subtaskId);
+				LOG.debug("Adding {} back to the split enumerator of source {}.", splitsToAddBack, operatorName);
+				enumerator.addSplitsBack(splitsToAddBack, subtaskId);
+			} catch (Exception e) {
+				LOG.error("Failing the job due to exception when handling subtask {} failure in source {}.",
+						subtaskId, operatorName, e);
+				context.failJob(e);
+			}
+		});
+	}
+
+	@Override
+	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+		ensureStarted();
+		return CompletableFuture.supplyAsync(() -> {
+			try {
+				LOG.debug("Taking a state snapshot on operator {} for checkpoint {}", operatorName, checkpointId);
+				return toBytes(checkpointId);
+			} catch (Exception e) {
+				throw new CompletionException(
+						String.format("Failed to checkpoint coordinator for source %s due to ", operatorName), e);
+			}
+		}, coordinatorExecutor);
+	}
+
+	@Override
+	public void checkpointComplete(long checkpointId) {
+		ensureStarted();
+		coordinatorExecutor.execute(() -> {
+			try {
+				LOG.info("Marking checkpoint {} as completed for source {}.", checkpointId, operatorName);
+				context.onCheckpointComplete(checkpointId);
+			} catch (Exception e) {
+				LOG.error("Failing the job due to exception when completing the checkpoint {} for source {}.",
+						checkpointId, operatorName, e);
+				context.failJob(e);
+			}
+		});
+	}
+
+	@Override
+	public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+		if (started) {
+			throw new IllegalStateException(String.format(
+					"The coordinator for source %s has started. The source coordinator state can " +
+					"only be reset to a checkpoint before it starts.", operatorName));
+		}
+		LOG.info("Resetting coordinator of source {} from checkpoint.", operatorName);
+		if (started) {
+			enumerator.close();
+		}
+		LOG.info("Resetting SourceCoordinator from checkpoint.");
+		fromBytes(checkpointData);
+	}
+
+	// ---------------------------------------------------
+	@VisibleForTesting
+	SplitEnumerator<SplitT, EnumChkT> getEnumerator() {
+		return enumerator;
+	}
+
+	@VisibleForTesting
+	SourceCoordinatorContext<SplitT> getContext() {
+		return context;
+	}
+
+	// --------------------- Serde -----------------------
+
+	/**
+	 * Serialize the coordinator state. The current implementation may not be super efficient,
+	 * but it should not matter that much because most of the state should be rather small.
+	 * Large states themselves may already be a problem regardless of how the serialization
+	 * is implemented.
+	 *
+	 * @return A byte array containing the serialized state of the source coordinator.
+	 * @throws Exception When something goes wrong in serialization.
+	 */
+	private byte[] toBytes(long checkpointId) throws Exception {
+		EnumChkT enumCkpt = enumerator.snapshotState();
+
+		try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+				DataOutputStream out = new DataOutputViewStreamWrapper(baos)) {
+			writeCoordinatorSerdeVersion(out);
+			out.writeInt(enumCheckpointSerializer.getVersion());
+			byte[] serialziedEnumChkpt = enumCheckpointSerializer.serialize(enumCkpt);
+			out.writeInt(serialziedEnumChkpt.length);
+			out.write(serialziedEnumChkpt);
+			context.snapshotState(checkpointId, splitSerializer, out);
+			out.flush();
+			return baos.toByteArray();
+		}
+	}
+
+	/**
+	 * Restore the state of this source coordinator from the state bytes.
+	 *
+	 * @param bytes The checkpoint bytes that was returned from {@link #toBytes(long)}
+	 * @throws Exception When the deserialization failed.
+	 */
+	private void fromBytes(byte[] bytes) throws Exception {
+		try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+				DataInputStream in = new DataInputViewStreamWrapper(bais)) {
+			readAndVerifyCoordinatorSerdeVersion(in);
+			int enumSerializerVersion = in.readInt();
+			int serializedEnumChkptSize = in.readInt();
+			byte[] serializedEnumChkpt = readBytes(in, serializedEnumChkptSize);
+			EnumChkT enumChkpt = enumCheckpointSerializer.deserialize(enumSerializerVersion, serializedEnumChkpt);
+			context.restoreState(splitSerializer, in);
+			enumerator = source.restoreEnumerator(context, enumChkpt);
+		}
+	}
+
+	// --------------------- private methods -------------
+
+	private void handleReaderRegistrationEvent(ReaderRegistrationEvent event) {
+		context.registerSourceReader(new ReaderInfo(event.subtaskId(), event.location()));
+		enumerator.addReader(event.subtaskId());
+	}
+
+	private void ensureStarted() {
+		if (!started) {
+			throw new IllegalStateException("The coordinator has not started yet.");
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
new file mode 100644
index 0000000..47fe564
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -0,0 +1,274 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readRegisteredReaders;
+import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.writeRegisteredReaders;
+
+/**
+ * A context class for the {@link OperatorCoordinator}. Compared with {@link SplitEnumeratorContext} this class
+ * allows interaction with state and sending {@link OperatorEvent} to the SourceOperator while
+ * {@link SplitEnumeratorContext} only allows sending {@link SourceEvent}.
+ *
+ * <p>The context serves a few purposes:
+ * <ul>
+ *     <li>
+ *         Information provider - The context provides necessary information to the enumerator for it to
+ *         know what is the status of the source readers and their split assignments. These information
+ *         allows the split enumerator to do the coordination.
+ *     </li>
+ *     <li>
+ *         Action taker - The context also provides a few actions that the enumerator can take to carry
+ *         out the coordination. So far there are two actions: 1) assign splits to the source readers.
+ *         and 2) sens a custom {@link SourceEvent SourceEvents} to the source readers.
+ *     </li>
+ *     <li>
+ *         Thread model enforcement - The context ensures that all the manipulations to the coordinator state
+ *         are handled by the same thread.
+ *     </li>
+ * </ul>
+ * @param <SplitT> the type of the splits.
+ */
+@Internal
+public class SourceCoordinatorContext<SplitT extends SourceSplit> implements SplitEnumeratorContext<SplitT> {
+	private final ExecutorService coordinatorExecutor;
+	private final ExecutorNotifier notifier;
+	private final OperatorCoordinator.Context operatorCoordinatorContext;
+	private final ConcurrentMap<Integer, ReaderInfo> registeredReaders;
+	private final SplitAssignmentTracker<SplitT> assignmentTracker;
+	private final SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory;
+	private final String coordinatorThreadName;
+
+	public SourceCoordinatorContext(
+			ExecutorService coordinatorExecutor,
+			SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
+			int numWorkerThreads,
+			OperatorCoordinator.Context operatorCoordinatorContext) {
+		this(coordinatorExecutor, coordinatorThreadFactory, numWorkerThreads, operatorCoordinatorContext,
+				new SplitAssignmentTracker<>());
+	}
+
+	// Package private method for unit test.
+	SourceCoordinatorContext(
+			ExecutorService coordinatorExecutor,
+			SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
+			int numWorkerThreads,
+			OperatorCoordinator.Context operatorCoordinatorContext,
+			SplitAssignmentTracker<SplitT> splitAssignmentTracker) {
+		this.coordinatorExecutor = coordinatorExecutor;
+		this.coordinatorThreadFactory = coordinatorThreadFactory;
+		this.operatorCoordinatorContext = operatorCoordinatorContext;
+		this.registeredReaders = new ConcurrentHashMap<>();
+		this.assignmentTracker = splitAssignmentTracker;
+		this.coordinatorThreadName = coordinatorThreadFactory.getCoordinatorThreadName();
+		this.notifier = new ExecutorNotifier(
+				Executors.newScheduledThreadPool(numWorkerThreads, new ThreadFactory() {
+					private int index = 0;
+					@Override
+					public Thread newThread(Runnable r) {
+						return new Thread(r, coordinatorThreadName + "-worker-" + index++);
+					}
+				}),
+				coordinatorExecutor);
+	}
+
+	@Override
+	public MetricGroup metricGroup() {
+		return null;
+	}
+
+	@Override
+	public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
+		try {
+			operatorCoordinatorContext.sendEvent(new SourceEventWrapper(event), subtaskId);
+		} catch (TaskNotRunningException e) {
+			throw new FlinkRuntimeException(String.format("Failed to send event %s to subtask %d",
+					event,
+					subtaskId), e);
+		}
+	}
+
+	@Override
+	public int currentParallelism() {
+		return operatorCoordinatorContext.currentParallelism();
+	}
+
+	@Override
+	public Map<Integer, ReaderInfo> registeredReaders() {
+		return Collections.unmodifiableMap(registeredReaders);
+	}
+
+	@Override
+	public void assignSplits(SplitsAssignment<SplitT> assignment) {
+		// Ensure the split assignment is done by the the coordinator executor.
+		if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+			try {
+				coordinatorExecutor.submit(() -> assignSplits(assignment)).get();
+				return;
+			} catch (InterruptedException | ExecutionException e) {
+				throw new FlinkRuntimeException("Failed to assign splits due to", e);
+			}
+		}
+
+		// Ensure all the subtasks in the assignment have registered.
+		for (Integer subtaskId : assignment.assignment().keySet()) {
+			if (!registeredReaders.containsKey(subtaskId)) {
+				throw new IllegalArgumentException(String.format(
+						"Cannot assign splits %s to subtask %d because the subtask is not registered.",
+						registeredReaders.get(subtaskId), subtaskId));
+			}
+		}
+
+		assignmentTracker.recordSplitAssignment(assignment);
+		assignment.assignment().forEach(
+				(id, splits) -> {
+					try {
+						operatorCoordinatorContext.sendEvent(new AddSplitEvent<>(splits), id);
+					} catch (TaskNotRunningException e) {
+						throw new FlinkRuntimeException(String.format(
+								"Failed to assign splits %s to reader %d.", splits, id), e);
+					}
+				});
+	}
+
+	@Override
+	public <T> void callAsync(
+			Callable<T> callable,
+			BiConsumer<T, Throwable> handler,
+			long initialDelay,
+			long period) {
+		notifier.notifyReadyAsync(callable, handler, initialDelay, period);
+	}
+
+	@Override
+	public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) {
+		notifier.notifyReadyAsync(callable, handler);
+	}
+
+	// --------- Package private additional methods for the SourceCoordinator ------------
+
+	/**
+	 * Fail the job with the given cause.
+	 *
+	 * @param cause the cause of the job failure.
+	 */
+	void failJob(Throwable cause) {
+		operatorCoordinatorContext.failJob(cause);
+	}
+
+	/**
+	 * Take a snapshot of this SourceCoordinatorContext.
+	 *
+	 * @param checkpointId The id of the ongoing checkpoint.
+	 * @param splitSerializer The serializer of the splits.
+	 * @param out An ObjectOutput that can be used to
+	 */
+	void snapshotState(
+			long checkpointId,
+			SimpleVersionedSerializer<SplitT> splitSerializer,
+			DataOutputStream out) throws Exception {
+		writeRegisteredReaders(registeredReaders, out);
+		assignmentTracker.snapshotState(checkpointId, splitSerializer, out);
+	}
+
+	/**
+	 * Restore the state of the context.
+	 * @param splitSerializer the serializer for the SourceSplits.
+	 * @param in the input from which the states are read.
+	 * @throws Exception when the restoration failed.
+	 */
+	@SuppressWarnings("unchecked")
+	void restoreState(
+			SimpleVersionedSerializer<SplitT> splitSerializer,
+			DataInputStream in) throws Exception {
+		Map<Integer, ReaderInfo> readers = readRegisteredReaders(in);
+		registeredReaders.clear();
+		registeredReaders.putAll(readers);
+		assignmentTracker.restoreState(splitSerializer, in);
+	}
+
+	/**
+	 * Register a source reader.
+	 *
+	 * @param readerInfo the reader information of the source reader.
+	 */
+	void registerSourceReader(ReaderInfo readerInfo) {
+		registeredReaders.put(readerInfo.getSubtaskId(), readerInfo);
+	}
+
+	/**
+	 * Unregister a source reader.
+	 *
+	 * @param subtaskId the subtask id of the source reader.
+	 */
+	void unregisterSourceReader(int subtaskId) {
+		Preconditions.checkNotNull(registeredReaders.remove(subtaskId), String.format(
+				"Failed to unregister source reader of id %s because it is not registered.", subtaskId));
+	}
+
+	/**
+	 * Get the split to put back. This only happens when a source reader subtask has failed.
+	 *
+	 * @param failedSubtaskId the failed subtask id.
+	 * @return A list of splits that needs to be added back to the {@link SplitEnumerator}.
+	 */
+	List<SplitT> getAndRemoveUncheckpointedAssignment(int failedSubtaskId) {
+		return assignmentTracker.getAndRemoveUncheckpointedAssignment(failedSubtaskId);
+	}
+
+	/**
+	 * Invoked when a successful checkpoint has been taken.
+	 *
+	 * @param checkpointId the id of the successful checkpoint.
+	 */
+	void onCheckpointComplete(long checkpointId) {
+		assignmentTracker.onCheckpointComplete(checkpointId);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
new file mode 100644
index 0000000..77ad7a3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -0,0 +1,112 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.function.BiConsumer;
+
+/**
+ * The provider of {@link SourceCoordinator}.
+ */
+public class SourceCoordinatorProvider<SplitT extends SourceSplit>
+		implements OperatorCoordinator.Provider {
+	private final String operatorName;
+	private final OperatorID operatorID;
+	private final Source<?, SplitT, ?> source;
+	private final int numWorkerThreads;
+
+	/**
+	 * Construct the {@link SourceCoordinatorProvider}.
+	 *
+	 * @param operatorName the name of the operator.
+	 * @param operatorID the ID of the operator this coordinator corresponds to.
+	 * @param source the Source that will be used for this coordinator.
+	 * @param numWorkerThreads the number of threads the should provide to the SplitEnumerator
+	 *                         for doing async calls. See
+	 *                         {@link org.apache.flink.api.connector.source.SplitEnumeratorContext#callAsync(Callable, BiConsumer)
+	 *                         SplitEnumeratorContext.callAsync()}.
+	 */
+	public SourceCoordinatorProvider(
+			String operatorName,
+			OperatorID operatorID,
+			Source<?, SplitT, ?> source,
+			int numWorkerThreads) {
+		this.operatorName = operatorName;
+		this.operatorID = operatorID;
+		this.source = source;
+		this.numWorkerThreads = numWorkerThreads;
+	}
+
+	@Override
+	public OperatorID getOperatorId() {
+		return operatorID;
+	}
+
+	@Override
+	public OperatorCoordinator create(OperatorCoordinator.Context context) {
+		final String coordinatorThreadName = "SourceCoordinator-" + operatorName;
+		CoordinatorExecutorThreadFactory coordinatorThreadFactory =
+				new CoordinatorExecutorThreadFactory(coordinatorThreadName);
+		ExecutorService coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+		SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
+				new SourceCoordinatorContext<>(coordinatorExecutor, coordinatorThreadFactory, numWorkerThreads, context);
+		return new SourceCoordinator<>(operatorName, coordinatorExecutor, source, sourceCoordinatorContext);
+	}
+
+	/**
+	 * A thread factory class that provides some helper methods.
+	 */
+	public static class CoordinatorExecutorThreadFactory implements ThreadFactory {
+		private final String coordinatorThreadName;
+		private Thread t;
+
+		CoordinatorExecutorThreadFactory(String coordinatorThreadName) {
+			this.coordinatorThreadName = coordinatorThreadName;
+			this.t = null;
+		}
+
+		@Override
+		public Thread newThread(Runnable r) {
+			if (t != null) {
+				throw new IllegalStateException("Should never happen. This factory should only be used by a " +
+						"SingleThreadExecutor.");
+			}
+			t = new Thread(r, coordinatorThreadName);
+			t.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
+			return t;
+		}
+
+		String getCoordinatorThreadName() {
+			return coordinatorThreadName;
+		}
+
+		boolean isCurrentThreadCoordinatorThread() {
+			return Thread.currentThread() == t;
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java
new file mode 100644
index 0000000..2855b2a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java
@@ -0,0 +1,179 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+
+/**
+ * A serialization util class for the {@link SourceCoordinator}.
+ */
+public class SourceCoordinatorSerdeUtils {
+	/** The current source coordinator serde version. */
+	private static final int CURRENT_VERSION = 0;
+
+	/** Private constructor for utility class. */
+	private SourceCoordinatorSerdeUtils() {}
+
+	/** Write the current serde version. */
+	static void writeCoordinatorSerdeVersion(DataOutputStream out) throws IOException {
+		out.writeInt(CURRENT_VERSION);
+	}
+
+	/** Read and verify the serde version. */
+	static void readAndVerifyCoordinatorSerdeVersion(DataInputStream in) throws IOException {
+		int version = in.readInt();
+		if (version > CURRENT_VERSION) {
+			throw new IOException("Unsupported source coordinator serde version " + version);
+		}
+	}
+
+	/**
+	 * Get serialized size of the registered readers map.
+	 *
+	 * <p>The binary format is following:
+	 * 4 Bytes - num entries.
+	 * N Bytes - entries
+	 * 		4 Bytes - subtask id
+	 * 		N Bytes - reader info, see {@link #writeReaderInfo(ReaderInfo, DataOutputStream)}.
+	 */
+	static void writeRegisteredReaders(Map<Integer, ReaderInfo> registeredReaders, DataOutputStream out) throws IOException {
+		out.writeInt(registeredReaders.size());
+		for (ReaderInfo info : registeredReaders.values()) {
+			writeReaderInfo(info, out);
+		}
+	}
+
+	static Map<Integer, ReaderInfo> readRegisteredReaders(DataInputStream in) throws IOException {
+		int numReaders = in.readInt();
+		Map<Integer, ReaderInfo> registeredReaders = new HashMap<>();
+		for (int i = 0; i < numReaders; i++) {
+			ReaderInfo info = readReaderInfo(in);
+			registeredReaders.put(info.getSubtaskId(), info);
+		}
+		return registeredReaders;
+	}
+
+	/**
+	 * Serialize the assignment by checkpoint ids.
+	 */
+	static <SplitT> void writeAssignmentsByCheckpointId(
+			Map<Long, Map<Integer, LinkedHashSet<SplitT>>> assignmentByCheckpointIds,
+			SimpleVersionedSerializer<SplitT> splitSerializer,
+			DataOutputStream out) throws IOException {
+		// SplitSerializer version.
+		out.writeInt(splitSerializer.getVersion());
+		// Num checkpoints.
+		out.writeInt(assignmentByCheckpointIds.size());
+		for (Map.Entry<Long, Map<Integer, LinkedHashSet<SplitT>>> assignments : assignmentByCheckpointIds.entrySet()) {
+			long checkpointId = assignments.getKey();
+			out.writeLong(checkpointId);
+
+			int numSubtasks = assignments.getValue().size();
+			out.writeInt(numSubtasks);
+			for (Map.Entry<Integer, LinkedHashSet<SplitT>> assignment : assignments.getValue().entrySet()) {
+				int subtaskId = assignment.getKey();
+				out.writeInt(subtaskId);
+
+				int numAssignedSplits = assignment.getValue().size();
+				out.writeInt(numAssignedSplits);
+				for (SplitT split : assignment.getValue()) {
+					byte[] serializedSplit = splitSerializer.serialize(split);
+					out.writeInt(serializedSplit.length);
+					out.write(serializedSplit);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Deserialize the assignment by checkpoint ids.
+	 */
+	static <SplitT> Map<Long, Map<Integer, LinkedHashSet<SplitT>>> readAssignmentsByCheckpointId(
+			DataInputStream in,
+			SimpleVersionedSerializer<SplitT> splitSerializer) throws IOException {
+		int splitSerializerVersion = in.readInt();
+		int numCheckpoints = in.readInt();
+		Map<Long, Map<Integer, LinkedHashSet<SplitT>>> assignmentsByCheckpointIds = new HashMap<>(numCheckpoints);
+		for (int i = 0; i < numCheckpoints; i++) {
+			long checkpointId = in.readLong();
+			int numSubtasks = in.readInt();
+			Map<Integer, LinkedHashSet<SplitT>> assignments = new HashMap<>();
+			assignmentsByCheckpointIds.put(checkpointId, assignments);
+			for (int j = 0; j < numSubtasks; j++) {
+				int subtaskId = in.readInt();
+				int numAssignedSplits = in.readInt();
+				LinkedHashSet<SplitT> splits = new LinkedHashSet<>(numAssignedSplits);
+				assignments.put(subtaskId, splits);
+				for (int k = 0; k < numAssignedSplits; k++) {
+					int serializedSplitSize = in.readInt();
+					byte[] serializedSplit = readBytes(in, serializedSplitSize);
+					SplitT split = splitSerializer.deserialize(splitSerializerVersion, serializedSplit);
+					splits.add(split);
+				}
+			}
+		}
+		return assignmentsByCheckpointIds;
+	}
+
+	static byte[] readBytes(DataInputStream in, int size) throws IOException {
+		byte[] bytes = new byte[size];
+		int off = 0;
+		// For ByteArrayInputStream the read should succeed with one shot.
+		while (off < size) {
+			int read = in.read(bytes, off, size - off);
+			if (read < 0) {
+				throw new BufferUnderflowException();
+			}
+			off += read;
+		}
+		return bytes;
+	}
+
+	// ----- private helper methods -----
+
+	/**
+	 * Serialize {@link ReaderInfo}.
+	 *
+	 * <p>The binary format is following:
+	 * 4 Bytes - subtask id
+	 * N Bytes - location string
+	 *
+	 * @param readerInfo the given reader information to serialize.
+	 */
+	private static void writeReaderInfo(ReaderInfo readerInfo, DataOutputStream out) throws IOException {
+		out.writeInt(readerInfo.getSubtaskId());
+		out.writeUTF(readerInfo.getLocation());
+	}
+
+	private static ReaderInfo readReaderInfo(DataInputStream in) throws IOException {
+		int subtaskId = in.readInt();
+		String location = in.readUTF();
+		return new ReaderInfo(subtaskId, location);
+	}
+
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java
new file mode 100644
index 0000000..7985f7d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java
@@ -0,0 +1,164 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readAssignmentsByCheckpointId;
+import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.writeAssignmentsByCheckpointId;
+
+/**
+ * A class that is responsible for tracking the past split assignments made by
+ * {@link SplitEnumerator}.
+ */
+@Internal
+public class SplitAssignmentTracker<SplitT extends SourceSplit> {
+	// All the split assignments since the last successful checkpoint.
+	// Maintaining this allow the subtasks to fail over independently.
+	// The mapping is [CheckpointId -> [SubtaskId -> LinkedHashSet[SourceSplits]]].
+	private final SortedMap<Long, Map<Integer, LinkedHashSet<SplitT>>> assignmentsByCheckpointId;
+	// The split assignments since the last checkpoint attempt.
+	// The mapping is [SubtaskId -> LinkedHashSet[SourceSplits]].
+	private Map<Integer, LinkedHashSet<SplitT>> uncheckpointedAssignments;
+
+	public SplitAssignmentTracker() {
+		this.assignmentsByCheckpointId = new TreeMap<>();
+		this.uncheckpointedAssignments = new HashMap<>();
+	}
+
+	/**
+	 * Take a snapshot of the uncheckpointed split assignments.
+	 *
+	 * @param checkpointId the id of the ongoing checkpoint
+	 */
+	public void snapshotState(
+			long checkpointId,
+			SimpleVersionedSerializer<SplitT> splitSerializer,
+			DataOutputStream out) throws Exception {
+		// Include the uncheckpointed assignments to the snapshot.
+		assignmentsByCheckpointId.put(checkpointId, uncheckpointedAssignments);
+		uncheckpointedAssignments = new HashMap<>();
+		writeAssignmentsByCheckpointId(assignmentsByCheckpointId, splitSerializer, out);
+	}
+
+	/**
+	 * Restore the state of the SplitAssignmentTracker.
+	 *
+	 * @param splitSerializer The serializer of the splits.
+	 * @param in The ObjectInput that contains the state of the SplitAssignmentTracker.
+	 * @throws Exception when the state deserialization fails.
+	 */
+	@SuppressWarnings("unchecked")
+	public void restoreState(SimpleVersionedSerializer<SplitT> splitSerializer, DataInputStream in) throws Exception {
+		// Read the split assignments by checkpoint id.
+		Map<Long, Map<Integer, LinkedHashSet<SplitT>>> deserializedAssignments =
+				readAssignmentsByCheckpointId(in, splitSerializer);
+		assignmentsByCheckpointId.putAll(deserializedAssignments);
+	}
+
+	/**
+	 * when a checkpoint has been successfully made, this method is invoked to clean up the assignment
+	 * history before this successful checkpoint.
+	 *
+	 * @param checkpointId the id of the successful checkpoint.
+	 */
+	public void onCheckpointComplete(long checkpointId) {
+		assignmentsByCheckpointId.entrySet().removeIf(entry -> entry.getKey() <= checkpointId);
+	}
+
+	/**
+	 * Record a new split assignment.
+	 *
+	 * @param splitsAssignment the new split assignment.
+	 */
+	public void recordSplitAssignment(SplitsAssignment<SplitT> splitsAssignment) {
+		addSplitAssignment(splitsAssignment, uncheckpointedAssignments);
+	}
+
+	/**
+	 * This method is invoked when a source reader fails over. In this case, the source reader will
+	 * restore its split assignment to the last successful checkpoint. Any split assignment to that
+	 * source reader after the last successful checkpoint will be lost on the source reader side as
+	 * if those splits were never assigned. To handle this case, the coordinator needs to find those
+	 * splits and return them back to the SplitEnumerator for re-assignment.
+	 *
+	 * @param failedSubtaskId the failed subtask id.
+	 * @return A list of splits that needs to be added back to the {@link SplitEnumerator}.
+	 */
+	public List<SplitT> getAndRemoveUncheckpointedAssignment(int failedSubtaskId) {
+		List<SplitT> splits = new ArrayList<>();
+		assignmentsByCheckpointId.values().forEach(assignments -> {
+			removeFromAssignment(failedSubtaskId, assignments, splits);
+		});
+		removeFromAssignment(failedSubtaskId, uncheckpointedAssignments, splits);
+		return splits;
+	}
+
+	// ------------- Methods visible for testing ----------------
+
+	@VisibleForTesting
+	SortedMap<Long, Map<Integer, LinkedHashSet<SplitT>>> assignmentsByCheckpointId() {
+		return assignmentsByCheckpointId;
+	}
+
+	@VisibleForTesting
+	Map<Integer, LinkedHashSet<SplitT>> assignmentsByCheckpointId(long checkpointId) {
+		return assignmentsByCheckpointId.get(checkpointId);
+	}
+
+	@VisibleForTesting
+	Map<Integer, LinkedHashSet<SplitT>> uncheckpointedAssignments() {
+		return uncheckpointedAssignments;
+	}
+
+	// -------------- private helpers ---------------
+
+	private void removeFromAssignment(
+			int subtaskId,
+			Map<Integer, LinkedHashSet<SplitT>> assignments,
+			List<SplitT> toPutBack) {
+		Set<SplitT> splitForSubtask = assignments.remove(subtaskId);
+		if (splitForSubtask != null) {
+			toPutBack.addAll(splitForSubtask);
+		}
+	}
+
+	private void addSplitAssignment(
+			SplitsAssignment<SplitT> additionalAssignment,
+			Map<Integer, LinkedHashSet<SplitT>> assignments) {
+		additionalAssignment.assignment().forEach((id, splits) ->
+				assignments.computeIfAbsent(id, ignored -> new LinkedHashSet<>()).addAll(splits));
+	}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java
similarity index 54%
copy from flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java
index f899b12..5f6cabd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java
@@ -16,36 +16,30 @@
  limitations under the License.
  */
 
-package org.apache.flink.api.connector.source;
+package org.apache.flink.runtime.source.event;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 
-import java.io.Serializable;
+import java.util.List;
 
 /**
- * A container class hosting the information of a {@link SourceReader}.
+ * A source event that adds splits to a source reader.
+ *
+ * @param <SplitT> the type of splits.
  */
-@Public
-public final class ReaderInfo implements Serializable {
-	private final int subtaskId;
-	private final String location;
-
-	public ReaderInfo(int subtaskId, String location) {
-		this.subtaskId = subtaskId;
-		this.location = location;
+public class AddSplitEvent<SplitT> implements OperatorEvent {
+	private final List<SplitT> splits;
+
+	public AddSplitEvent(List<SplitT> splits) {
+		this.splits = splits;
 	}
 
-	/**
-	 * @return the ID of the subtask that runs the source reader.
-	 */
-	public int getSubtaskId() {
-		return subtaskId;
+	public List<SplitT> splits() {
+		return splits;
 	}
 
-	/**
-	 * @return the location of the subtask that runs this source reader.
-	 */
-	public String getLocation() {
-		return location;
+	@Override
+	public String toString() {
+		return String.format("AddSplitEvents[%s]", splits);
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
similarity index 61%
copy from flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
index f899b12..4cb2aeb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
@@ -16,36 +16,33 @@
  limitations under the License.
  */
 
-package org.apache.flink.api.connector.source;
+package org.apache.flink.runtime.source.event;
 
-import org.apache.flink.annotation.Public;
-
-import java.io.Serializable;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 
 /**
- * A container class hosting the information of a {@link SourceReader}.
+ * An {@link OperatorEvent} that registers a {@link org.apache.flink.api.connector.source.SourceReader SourceReader}
+ * to the SourceCoordinator.
  */
-@Public
-public final class ReaderInfo implements Serializable {
+public class ReaderRegistrationEvent implements OperatorEvent {
 	private final int subtaskId;
 	private final String location;
 
-	public ReaderInfo(int subtaskId, String location) {
+	public ReaderRegistrationEvent(int subtaskId, String location) {
 		this.subtaskId = subtaskId;
 		this.location = location;
 	}
 
-	/**
-	 * @return the ID of the subtask that runs the source reader.
-	 */
-	public int getSubtaskId() {
+	public int subtaskId() {
 		return subtaskId;
 	}
 
-	/**
-	 * @return the location of the subtask that runs this source reader.
-	 */
-	public String getLocation() {
+	public String location() {
 		return location;
 	}
+
+	@Override
+	public String toString() {
+		return String.format("ReaderRegistrationEvent[subtaskId = %d, location = %s)", subtaskId, location);
+	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/SourceEventWrapper.java
similarity index 54%
copy from flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/source/event/SourceEventWrapper.java
index f899b12..0a5ffa3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/SourceEventWrapper.java
@@ -16,36 +16,30 @@
  limitations under the License.
  */
 
-package org.apache.flink.api.connector.source;
+package org.apache.flink.runtime.source.event;
 
-import org.apache.flink.annotation.Public;
-
-import java.io.Serializable;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 
 /**
- * A container class hosting the information of a {@link SourceReader}.
+ * A wrapper operator event that contains a custom defined operator event.
  */
-@Public
-public final class ReaderInfo implements Serializable {
-	private final int subtaskId;
-	private final String location;
-
-	public ReaderInfo(int subtaskId, String location) {
-		this.subtaskId = subtaskId;
-		this.location = location;
+public class SourceEventWrapper implements OperatorEvent {
+	private final SourceEvent sourceEvent;
+
+	public SourceEventWrapper(SourceEvent sourceEvent) {
+		this.sourceEvent = sourceEvent;
 	}
 
 	/**
-	 * @return the ID of the subtask that runs the source reader.
+	 * @return The {@link SourceEvent} in this SourceEventWrapper.
 	 */
-	public int getSubtaskId() {
-		return subtaskId;
+	public SourceEvent getSourceEvent() {
+		return sourceEvent;
 	}
 
-	/**
-	 * @return the location of the subtask that runs this source reader.
-	 */
-	public String getLocation() {
-		return location;
+	@Override
+	public String toString() {
+		return String.format("SourceEventWrapper[%s]", sourceEvent);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
new file mode 100644
index 0000000..7928535
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
@@ -0,0 +1,105 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+public class MockOperatorCoordinatorContext implements OperatorCoordinator.Context {
+	private final OperatorID operatorID;
+	private final int numSubtasks;
+	private final boolean failEventSending;
+
+	private final Map<Integer, List<OperatorEvent>> eventsToOperator;
+	private final LinkedHashMap<Integer, Throwable> failedTasks;
+	private boolean jobFailed;
+
+	public MockOperatorCoordinatorContext(OperatorID operatorID, int numSubtasks) {
+		this(operatorID, numSubtasks, true);
+	}
+
+	public MockOperatorCoordinatorContext(OperatorID operatorID, int numSubtasks, boolean failEventSending) {
+		this.operatorID = operatorID;
+		this.numSubtasks = numSubtasks;
+		this.eventsToOperator = new HashMap<>();
+		this.failedTasks = new LinkedHashMap<>();
+		this.jobFailed = false;
+		this.failEventSending = failEventSending;
+	}
+
+	@Override
+	public OperatorID getOperatorId() {
+		return operatorID;
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> sendEvent(
+			OperatorEvent evt,
+			int targetSubtask) throws TaskNotRunningException {
+		eventsToOperator.computeIfAbsent(targetSubtask, subtaskId -> new ArrayList<>()).add(evt);
+		if (failEventSending) {
+			CompletableFuture<Acknowledge> future = new CompletableFuture<>();
+			future.completeExceptionally(new FlinkRuntimeException("Testing Exception to fail event sending."));
+			return future;
+		} else {
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		}
+	}
+
+	@Override
+	public void failTask(int subtask, Throwable cause) {
+		failedTasks.put(subtask, cause);
+	}
+
+	@Override
+	public void failJob(Throwable cause) {
+		jobFailed = true;
+	}
+
+	@Override
+	public int currentParallelism() {
+		return numSubtasks;
+	}
+
+	// -------------------------------
+
+	public List<OperatorEvent> getEventsToOperatorBySubtaskId(int subtaskId) {
+		return eventsToOperator.get(subtaskId);
+	}
+
+	public Map<Integer, List<OperatorEvent>> getEventsToOperator() {
+		return eventsToOperator;
+	}
+
+	public LinkedHashMap<Integer, Throwable> getFailedTasks() {
+		return failedTasks;
+	}
+
+	public boolean isJobFailed() {
+		return jobFailed;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java
new file mode 100644
index 0000000..c2d6fb8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java
@@ -0,0 +1,84 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.hamcrest.Matchers;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * A util class containing the helper methods for the coordinator tests.
+ */
+class CoordinatorTestUtils {
+
+	/**
+	 * Create a SplitsAssignment. The assignments looks like following:
+	 * Subtask 0: Splits {0}
+	 * Subtask 1: Splits {1, 2}
+	 * Subtask 2: Splits {3, 4, 5}
+	 */
+	static SplitsAssignment<MockSourceSplit> getSplitsAssignment(int numSubtasks, int startingSplitId) {
+		Map<Integer, List<MockSourceSplit>> assignments = new HashMap<>();
+		int splitId = startingSplitId;
+		for (int subtaskIndex = 0; subtaskIndex < numSubtasks; subtaskIndex++) {
+			List<MockSourceSplit> subtaskAssignment = new ArrayList<>();
+			for (int j = 0; j < subtaskIndex + 1; j++) {
+				subtaskAssignment.add(new MockSourceSplit(splitId++));
+			}
+			assignments.put(subtaskIndex, subtaskAssignment);
+		}
+		return new SplitsAssignment<>(assignments);
+	}
+
+	/**
+	 * Check the actual assignment meets the expectation.
+	 */
+	static void verifyAssignment(List<String> expectedSplitIds, Collection<MockSourceSplit> actualAssignment) {
+		assertEquals(expectedSplitIds.size(), actualAssignment.size());
+		int i = 0;
+		for (MockSourceSplit split : actualAssignment) {
+			assertEquals(expectedSplitIds.get(i++), split.splitId());
+		}
+	}
+
+	static void verifyException(ThrowingRunnable<Throwable> runnable, String failureMessage, String errorMessage) {
+		try {
+			runnable.run();
+			fail(failureMessage);
+		} catch (Throwable t) {
+			Throwable rootCause = t;
+			while (rootCause.getCause() != null) {
+				rootCause = rootCause.getCause();
+			}
+			assertThat(rootCause.getMessage(), Matchers.startsWith(errorMessage));
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
new file mode 100644
index 0000000..ba73ca0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -0,0 +1,188 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.getSplitsAssignment;
+import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment;
+import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for {@link SourceCoordinatorContext}.
+ */
+public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
+
+	@Test
+	public void testRegisterReader() {
+		List<ReaderInfo> readerInfo = registerReaders();
+
+		assertTrue(context.registeredReaders().containsKey(0));
+		assertTrue(context.registeredReaders().containsKey(1));
+		assertEquals(readerInfo.get(0), context.registeredReaders().get(0));
+		assertEquals(readerInfo.get(1), context.registeredReaders().get(1));
+	}
+
+	@Test
+	public void testUnregisterReader() {
+		List<ReaderInfo> readerInfo = registerReaders();
+		assertEquals(readerInfo.get(0), context.registeredReaders().get(0));
+
+		context.unregisterSourceReader(0);
+		assertEquals("Only reader 2 should be registered.", 2, context.registeredReaders().size());
+		assertNull(context.registeredReaders().get(0));
+		assertEquals(readerInfo.get(1), context.registeredReaders().get(1));
+		assertEquals(readerInfo.get(2), context.registeredReaders().get(2));
+	}
+
+	@Test
+	public void testAssignSplitsFromCoordinatorExecutor() throws ExecutionException, InterruptedException {
+		testAssignSplits(true);
+	}
+
+	@Test
+	public void testAssignSplitsFromOtherThread() throws ExecutionException, InterruptedException {
+		testAssignSplits(false);
+	}
+
+	private void testAssignSplits(boolean fromCoordinatorExecutor) throws ExecutionException, InterruptedException {
+		// Register the readers.
+		registerReaders();
+
+		// Assign splits to the readers.
+		SplitsAssignment<MockSourceSplit> splitsAssignment = getSplitsAssignment(2, 0);
+		if (fromCoordinatorExecutor) {
+			coordinatorExecutor.submit(() -> context.assignSplits(splitsAssignment)).get();
+		} else {
+			context.assignSplits(splitsAssignment);
+		}
+
+		// The tracker should have recorded the assignments.
+		verifyAssignment(Arrays.asList("0"), splitSplitAssignmentTracker.uncheckpointedAssignments().get(0));
+		verifyAssignment(Arrays.asList("1", "2"), splitSplitAssignmentTracker.uncheckpointedAssignments().get(1));
+		// The OperatorCoordinatorContext should have received the event sending call.
+		assertEquals("There should be two events sent to the subtasks.",
+				2, operatorCoordinatorContext.getEventsToOperator().size());
+
+		// Assert the events to subtask0.
+		List<OperatorEvent> eventsToSubtask0 = operatorCoordinatorContext.getEventsToOperatorBySubtaskId(0);
+		assertEquals(1, eventsToSubtask0.size());
+		OperatorEvent event = eventsToSubtask0.get(0);
+		assertTrue(event instanceof AddSplitEvent);
+		verifyAssignment(Arrays.asList("0"), ((AddSplitEvent) event).splits());
+	}
+
+	@Test
+	public void testAssignSplitToUnregisteredReaderFromCoordinatorExecutor() {
+		testAssignSplitToUnregisterdReader(true);
+	}
+
+	@Test
+	public void testAssignSplitToUnregisteredReaderFromOtherThread() {
+		testAssignSplitToUnregisterdReader(false);
+	}
+
+	private void testAssignSplitToUnregisterdReader(boolean fromCoordinatorExecutor) {
+		// Assign splits to the readers.
+		SplitsAssignment<MockSourceSplit> splitsAssignment = getSplitsAssignment(2, 0);
+		verifyException(
+				() -> {
+					if (fromCoordinatorExecutor) {
+						coordinatorExecutor.submit(() -> context.assignSplits(splitsAssignment)).get();
+					} else {
+						context.assignSplits(splitsAssignment);
+					}
+				},
+				"assignSpoits() should fail to assign the splits to a reader that is not registered.",
+				"Cannot assign splits");
+	}
+
+	@Test
+	public void testSnapshotAndRestore() throws Exception {
+		registerReaders();
+
+		// Assign splits to the readers.
+		SplitsAssignment<MockSourceSplit> splitsAssignment = getSplitsAssignment(2, 0);
+		coordinatorExecutor.submit(() -> context.assignSplits(splitsAssignment)).get();
+		// Take the first snapshot;
+		byte[] bytes = takeSnapshot(context, 100L);
+
+		SourceCoordinatorContext<MockSourceSplit> restoredContext;
+		SplitAssignmentTracker<MockSourceSplit> restoredTracker = new SplitAssignmentTracker<>();
+		SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory =
+				new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(TEST_OPERATOR_ID.toHexString());
+		try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+				DataInputStream in = new DataInputStream(bais)) {
+			restoredContext = new SourceCoordinatorContext<>(
+					coordinatorExecutor,
+					coordinatorThreadFactory,
+					1,
+					operatorCoordinatorContext,
+					restoredTracker);
+			restoredContext.restoreState(new MockSourceSplitSerializer(), in);
+		}
+		assertEquals(context.registeredReaders(), restoredContext.registeredReaders());
+		assertEquals(splitSplitAssignmentTracker.uncheckpointedAssignments(), restoredTracker.uncheckpointedAssignments());
+		assertEquals(splitSplitAssignmentTracker.assignmentsByCheckpointId(), restoredTracker.assignmentsByCheckpointId());
+
+	}
+
+	// ------------------------
+
+	private List<ReaderInfo> registerReaders() {
+		// Register the readers.
+		ReaderInfo readerInfo0 = new ReaderInfo(0, "subtask_0_location");
+		ReaderInfo readerInfo1 = new ReaderInfo(1, "subtask_1_location");
+		ReaderInfo readerInfo2 = new ReaderInfo(2, "subtask_1_location");
+		context.registerSourceReader(readerInfo0);
+		context.registerSourceReader(readerInfo1);
+		context.registerSourceReader(readerInfo2);
+		return Arrays.asList(readerInfo0, readerInfo1, readerInfo2);
+	}
+
+	private byte[] takeSnapshot(SourceCoordinatorContext<MockSourceSplit> context, long checkpointId) throws Exception {
+		byte[] bytes;
+		try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+				DataOutputStream out = new DataOutputViewStreamWrapper(baos)) {
+			context.snapshotState(checkpointId, new MockSourceSplitSerializer(), out);
+			out.flush();
+			bytes = baos.toByteArray();
+		}
+		return bytes;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
new file mode 100644
index 0000000..2f12235
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -0,0 +1,214 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment;
+import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for {@link SourceCoordinator}.
+ */
+public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
+
+	@Test
+	public void testThrowExceptionWhenNotStarted() {
+		// The following methods should only be invoked after the source coordinator has started.
+		String failureMessage = "Call should fail when source coordinator has not started yet.";
+		verifyException(() -> sourceCoordinator.checkpointComplete(100L),
+				failureMessage, "The coordinator has not started yet.");
+		verifyException(() -> sourceCoordinator.handleEventFromOperator(0, null),
+				failureMessage, "The coordinator has not started yet.");
+		verifyException(() -> sourceCoordinator.subtaskFailed(0),
+				failureMessage, "The coordinator has not started yet.");
+		verifyException(() -> sourceCoordinator.checkpointCoordinator(100L),
+				failureMessage, "The coordinator has not started yet.");
+	}
+
+	@Test
+	public void testRestCheckpointAfterCoordinatorStarted() throws Exception {
+		// The following methods should only be invoked after the source coordinator has started.
+		sourceCoordinator.start();
+		verifyException(() -> sourceCoordinator.resetToCheckpoint(null),
+				"Reset to checkpoint should fail after the coordinator has started",
+				String.format("The coordinator for source %s has started. The source coordinator state can " +
+						"only be reset to a checkpoint before it starts.", OPERATOR_NAME));
+	}
+
+	@Test
+	public void testStart() throws Exception {
+		assertFalse(enumerator.started());
+		sourceCoordinator.start();
+		assertTrue(enumerator.started());
+	}
+
+	@Test
+	public void testClosed() throws Exception {
+		assertFalse(enumerator.closed());
+		sourceCoordinator.start();
+		sourceCoordinator.close();
+		assertTrue(enumerator.closed());
+	}
+
+	@Test
+	public void testReaderRegistration() throws Exception {
+		sourceCoordinator.start();
+		sourceCoordinator.handleEventFromOperator(
+				0, new ReaderRegistrationEvent(0, "location_0"));
+		check(() -> {
+			assertEquals("2 splits should have been assigned to reader 0",
+					4, enumerator.getUnassignedSplits().size());
+			assertTrue(context.registeredReaders().containsKey(0));
+			assertTrue(enumerator.getHandledSourceEvent().isEmpty());
+			verifyAssignment(Arrays.asList("0", "3"), splitSplitAssignmentTracker.uncheckpointedAssignments().get(0));
+		});
+	}
+
+	@Test
+	public void testHandleSourceEvent() throws Exception {
+		sourceCoordinator.start();
+		SourceEvent sourceEvent = new SourceEvent() {};
+		sourceCoordinator.handleEventFromOperator(0, new SourceEventWrapper(sourceEvent));
+		check(() -> {
+			assertEquals(1, enumerator.getHandledSourceEvent().size());
+			assertEquals(sourceEvent, enumerator.getHandledSourceEvent().get(0));
+		});
+	}
+
+	@Test
+	public void testCheckpointCoordinatorAndRestore() throws Exception {
+		sourceCoordinator.start();
+		sourceCoordinator.handleEventFromOperator(
+				0, new ReaderRegistrationEvent(0, "location_0"));
+		byte[] bytes = sourceCoordinator.checkpointCoordinator(100L).get();
+
+		// restore from the checkpoints.
+		SourceCoordinator<?, ?> restoredCoordinator = getNewSourceCoordinator();
+		restoredCoordinator.resetToCheckpoint(bytes);
+		MockSplitEnumerator restoredEnumerator = (MockSplitEnumerator) restoredCoordinator.getEnumerator();
+		SourceCoordinatorContext restoredContext = restoredCoordinator.getContext();
+		assertEquals("2 splits should have been assigned to reader 0",
+				4, restoredEnumerator.getUnassignedSplits().size());
+		assertTrue(restoredEnumerator.getHandledSourceEvent().isEmpty());
+		assertEquals(1, restoredContext.registeredReaders().size());
+		assertTrue(restoredContext.registeredReaders().containsKey(0));
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testSubtaskFailedAndRevertUncompletedAssignments() throws Exception {
+		sourceCoordinator.start();
+
+		// Assign some splits to reader 0 then take snapshot 100.
+		sourceCoordinator.handleEventFromOperator(
+				0, new ReaderRegistrationEvent(0, "location_0"));
+		sourceCoordinator.checkpointCoordinator(100L).get();
+
+		// Add split 6, assign it to reader 0 and take another snapshot 101.
+		enumerator.addNewSplits(Collections.singletonList(new MockSourceSplit(6)));
+		sourceCoordinator.checkpointCoordinator(101L).get();
+
+		// check the state.
+		check(() -> {
+			// There should be 4 unassigned splits.
+			assertEquals(4, enumerator.getUnassignedSplits().size());
+			verifyAssignment(
+					Arrays.asList("0", "3"),
+					splitSplitAssignmentTracker.assignmentsByCheckpointId().get(100L).get(0));
+			assertTrue(splitSplitAssignmentTracker.uncheckpointedAssignments().isEmpty());
+			verifyAssignment(Arrays.asList("0", "3"), splitSplitAssignmentTracker.assignmentsByCheckpointId(100L).get(0));
+			verifyAssignment(Arrays.asList("6"), splitSplitAssignmentTracker.assignmentsByCheckpointId(101L).get(0));
+
+			List<OperatorEvent> eventsToReader0 = operatorCoordinatorContext.getEventsToOperator().get(0);
+			assertEquals(2, eventsToReader0.size());
+			verifyAssignment(Arrays.asList("0", "3"), ((AddSplitEvent<MockSourceSplit>) eventsToReader0.get(0)).splits());
+			verifyAssignment(Arrays.asList("6"), ((AddSplitEvent<MockSourceSplit>) eventsToReader0.get(1)).splits());
+		});
+
+		// Fail reader 0.
+		sourceCoordinator.subtaskFailed(0);
+
+		// check the state again.
+		check(() -> {
+			//
+			assertFalse("Reader 0 should have been unregistered.",
+					context.registeredReaders().containsKey(0));
+			// The tracker should have reverted all the splits assignment to reader 0.
+			for (Map<Integer, ?> assignment : splitSplitAssignmentTracker.assignmentsByCheckpointId().values()) {
+				assertFalse("Assignment in uncompleted checkpoint should have been reverted.",
+						assignment.containsKey(0));
+			}
+			assertFalse(splitSplitAssignmentTracker.uncheckpointedAssignments().containsKey(0));
+			// The split enumerator should now contains the splits used to be assigned to reader 0.
+			assertEquals(7, enumerator.getUnassignedSplits().size());
+		});
+	}
+
+	@Test
+	public void testFailedSubtaskDoNotRevertCompletedCheckpoint() throws Exception {
+		sourceCoordinator.start();
+
+		// Assign some splits to reader 0 then take snapshot 100.
+		sourceCoordinator.handleEventFromOperator(
+				0, new ReaderRegistrationEvent(0, "location_0"));
+		sourceCoordinator.checkpointCoordinator(100L).get();
+		// Complete checkpoint 100.
+		sourceCoordinator.checkpointComplete(100L);
+
+		// Fail reader 0.
+		sourceCoordinator.subtaskFailed(0);
+
+		check(() -> {
+			// Reader 0 hase been unregistered.
+			assertFalse(context.registeredReaders().containsKey(0));
+			// The assigned splits are not reverted.
+			assertEquals(4, enumerator.getUnassignedSplits().size());
+			assertFalse(splitSplitAssignmentTracker.uncheckpointedAssignments().containsKey(0));
+			assertTrue(splitSplitAssignmentTracker.assignmentsByCheckpointId().isEmpty());
+		});
+	}
+
+	// -------------------------------
+
+	private void check(Runnable runnable) {
+		try {
+			coordinatorExecutor.submit(runnable).get();
+		} catch (Exception e) {
+			fail("Test failed due to " + e);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
new file mode 100644
index 0000000..80a73bb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -0,0 +1,86 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * The test base for SourceCoordinator related tests.
+ */
+public abstract class SourceCoordinatorTestBase {
+	protected static final String OPERATOR_NAME = "TestOperator";
+	protected static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234L, 5678L);
+	protected static final int NUM_SUBTASKS = 3;
+
+	protected ExecutorService coordinatorExecutor;
+	protected MockOperatorCoordinatorContext operatorCoordinatorContext;
+	protected SplitAssignmentTracker<MockSourceSplit> splitSplitAssignmentTracker;
+	protected SourceCoordinatorContext<MockSourceSplit> context;
+	protected SourceCoordinator<?, ?> sourceCoordinator;
+	protected MockSplitEnumerator enumerator;
+
+	@Before
+	public void setup() {
+		operatorCoordinatorContext = new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, NUM_SUBTASKS);
+		splitSplitAssignmentTracker = new SplitAssignmentTracker<>();
+		String coordinatorThreadName = TEST_OPERATOR_ID.toHexString();
+		SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory =
+				new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(coordinatorThreadName);
+		coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+		context = new SourceCoordinatorContext<>(
+				coordinatorExecutor,
+				coordinatorThreadFactory,
+				1,
+				operatorCoordinatorContext,
+				splitSplitAssignmentTracker);
+		sourceCoordinator = getNewSourceCoordinator();
+		enumerator = (MockSplitEnumerator) sourceCoordinator.getEnumerator();
+	}
+
+	@After
+	public void cleanUp() throws InterruptedException, TimeoutException {
+		coordinatorExecutor.shutdown();
+		if (!coordinatorExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+			throw new TimeoutException("Failed to close the CoordinatorExecutor before timeout.");
+		}
+	}
+
+	// --------------------------
+
+	protected SourceCoordinator getNewSourceCoordinator() {
+		Source<Integer, MockSourceSplit, Set<MockSourceSplit>> mockSource =
+				new MockSource(Boundedness.BOUNDED, NUM_SUBTASKS * 2);
+		return new SourceCoordinator<>(OPERATOR_NAME, coordinatorExecutor, mockSource, context);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java
new file mode 100644
index 0000000..86ca76e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java
@@ -0,0 +1,173 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.getSplitsAssignment;
+import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for @link {@link SplitAssignmentTracker}.
+ */
+public class SplitAssignmentTrackerTest {
+
+	@Test
+	public void testRecordIncrementalSplitAssignment() {
+		SplitAssignmentTracker<MockSourceSplit> tracker = new SplitAssignmentTracker<>();
+		tracker.recordSplitAssignment(getSplitsAssignment(3, 0));
+		tracker.recordSplitAssignment(getSplitsAssignment(2, 6));
+
+		verifyAssignment(Arrays.asList("0", "6"), tracker.uncheckpointedAssignments().get(0));
+		verifyAssignment(Arrays.asList("1", "2", "7", "8"), tracker.uncheckpointedAssignments().get(1));
+		verifyAssignment(Arrays.asList("3", "4", "5"), tracker.uncheckpointedAssignments().get(2));
+	}
+
+	@Test
+	public void testTakeSnapshot() throws Exception {
+		final long checkpointId = 123L;
+		SplitAssignmentTracker<MockSourceSplit> tracker = new SplitAssignmentTracker<>();
+		tracker.recordSplitAssignment(getSplitsAssignment(3, 0));
+
+		// Serialize
+		takeSnapshot(tracker, checkpointId);
+
+		// Verify the uncheckpointed assignments.
+		assertTrue(tracker.uncheckpointedAssignments().isEmpty());
+
+		// verify assignments put into the checkpoints.
+		Map<Long, Map<Integer, LinkedHashSet<MockSourceSplit>>> assignmentsByCheckpoints =
+				tracker.assignmentsByCheckpointId();
+		assertEquals(1, assignmentsByCheckpoints.size());
+
+		Map<Integer, LinkedHashSet<MockSourceSplit>> assignmentForCheckpoint = assignmentsByCheckpoints.get(checkpointId);
+		assertNotNull(assignmentForCheckpoint);
+
+		verifyAssignment(Arrays.asList("0"), assignmentForCheckpoint.get(0));
+		verifyAssignment(Arrays.asList("1", "2"), assignmentForCheckpoint.get(1));
+		verifyAssignment(Arrays.asList("3", "4", "5"), assignmentForCheckpoint.get(2));
+	}
+
+	@Test
+	public void testRestore() throws Exception {
+		final long checkpointId = 123L;
+		SplitAssignmentTracker<MockSourceSplit> tracker = new SplitAssignmentTracker<>();
+		tracker.recordSplitAssignment(getSplitsAssignment(1, 0));
+
+		// Serialize
+		byte[] bytes = takeSnapshot(tracker, checkpointId);
+
+		// Deserialize
+		SplitAssignmentTracker<MockSourceSplit> deserializedTracker = restoreSnapshot(bytes);
+		// Verify the restore was successful.
+		assertEquals(deserializedTracker.assignmentsByCheckpointId(), tracker.assignmentsByCheckpointId());
+		assertEquals(deserializedTracker.uncheckpointedAssignments(), tracker.uncheckpointedAssignments());
+	}
+
+	@Test
+	public void testOnCheckpointComplete() throws Exception {
+		final long checkpointId1 = 100L;
+		final long checkpointId2 = 101L;
+		SplitAssignmentTracker<MockSourceSplit> tracker = new SplitAssignmentTracker<>();
+
+		// Assign some splits to subtask 0 and 1.
+		tracker.recordSplitAssignment(getSplitsAssignment(2, 0));
+
+		// Take the first snapshot.
+		takeSnapshot(tracker, checkpointId1);
+		verifyAssignment(Arrays.asList("0"), tracker.assignmentsByCheckpointId(checkpointId1).get(0));
+		verifyAssignment(Arrays.asList("1", "2"), tracker.assignmentsByCheckpointId(checkpointId1).get(1));
+
+		// Assign additional splits to subtask 0 and 1.
+		tracker.recordSplitAssignment(getSplitsAssignment(2, 3));
+
+		// Take the second snapshot.
+		takeSnapshot(tracker, checkpointId2);
+		verifyAssignment(Arrays.asList("0"), tracker.assignmentsByCheckpointId(checkpointId1).get(0));
+		verifyAssignment(Arrays.asList("1", "2"), tracker.assignmentsByCheckpointId(checkpointId1).get(1));
+		verifyAssignment(Arrays.asList("3"), tracker.assignmentsByCheckpointId(checkpointId2).get(0));
+		verifyAssignment(Arrays.asList("4", "5"), tracker.assignmentsByCheckpointId(checkpointId2).get(1));
+
+		// Complete the first checkpoint.
+		tracker.onCheckpointComplete(checkpointId1);
+		assertNull(tracker.assignmentsByCheckpointId(checkpointId1));
+		verifyAssignment(Arrays.asList("3"), tracker.assignmentsByCheckpointId(checkpointId2).get(0));
+		verifyAssignment(Arrays.asList("4", "5"), tracker.assignmentsByCheckpointId(checkpointId2).get(1));
+	}
+
+	@Test
+	public void testGetAndRemoveUncheckpointedAssignment() throws Exception {
+		final long checkpointId1 = 100L;
+		final long checkpointId2 = 101L;
+		SplitAssignmentTracker<MockSourceSplit> tracker = new SplitAssignmentTracker<>();
+
+		// Assign some splits and take snapshot 1.
+		tracker.recordSplitAssignment(getSplitsAssignment(2, 0));
+		takeSnapshot(tracker, checkpointId1);
+
+		// Assign some more splits and take snapshot 2.
+		tracker.recordSplitAssignment(getSplitsAssignment(2, 3));
+		takeSnapshot(tracker, checkpointId2);
+
+		// Now assume subtask 0 has failed.
+		List<MockSourceSplit> splitsToPutBack = tracker.getAndRemoveUncheckpointedAssignment(0);
+		verifyAssignment(Arrays.asList("0", "3"), splitsToPutBack);
+	}
+
+	// ---------------------
+
+	private byte[] takeSnapshot(SplitAssignmentTracker<MockSourceSplit> tracker, long checkpointId) throws Exception {
+		byte[] bytes;
+		try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+				DataOutputStream out = new DataOutputViewStreamWrapper(baos)) {
+			tracker.snapshotState(checkpointId, new MockSourceSplitSerializer(), out);
+			out.flush();
+			bytes = baos.toByteArray();
+		}
+		return bytes;
+	}
+
+	private SplitAssignmentTracker<MockSourceSplit> restoreSnapshot(byte[] bytes) throws Exception {
+		SplitAssignmentTracker<MockSourceSplit> deserializedTracker;
+		try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+				DataInputStream in = new DataInputViewStreamWrapper(bais)) {
+			deserializedTracker = new SplitAssignmentTracker<>();
+			deserializedTracker.restoreState(new MockSourceSplitSerializer(), in);
+		}
+		return deserializedTracker;
+	}
+}