You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/05/05 15:20:04 UTC
[flink] branch master updated: [FLINK-15101][connector/common] Add
the SourceCoordinator implementation
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ed40049 [FLINK-15101][connector/common] Add the SourceCoordinator implementation
ed40049 is described below
commit ed400497e56ea272722ac71697edf830b2d682ae
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Mar 23 08:53:31 2020 +0800
[FLINK-15101][connector/common] Add the SourceCoordinator implementation
---
.../flink/api/connector/source/ReaderInfo.java | 15 ++
.../api/connector/source/SplitEnumerator.java | 3 +-
.../connector/source/SplitEnumeratorContext.java | 26 +-
.../api/connector/source/mocks/MockSource.java | 75 ++++++
.../connector/source/mocks/MockSourceSplit.java | 2 +-
.../source/mocks/MockSplitEnumerator.java | 139 +++++++++++
.../MockSplitEnumeratorCheckpointSerializer.java | 55 ++++
.../source/coordinator/ExecutorNotifier.java | 137 ++++++++++
.../source/coordinator/SourceCoordinator.java | 278 +++++++++++++++++++++
.../coordinator/SourceCoordinatorContext.java | 274 ++++++++++++++++++++
.../coordinator/SourceCoordinatorProvider.java | 112 +++++++++
.../coordinator/SourceCoordinatorSerdeUtils.java | 179 +++++++++++++
.../source/coordinator/SplitAssignmentTracker.java | 164 ++++++++++++
.../flink/runtime/source/event/AddSplitEvent.java | 38 ++-
.../source/event/ReaderRegistrationEvent.java | 29 +--
.../runtime/source/event/SourceEventWrapper.java | 36 ++-
.../MockOperatorCoordinatorContext.java | 105 ++++++++
.../source/coordinator/CoordinatorTestUtils.java | 84 +++++++
.../coordinator/SourceCoordinatorContextTest.java | 188 ++++++++++++++
.../source/coordinator/SourceCoordinatorTest.java | 214 ++++++++++++++++
.../coordinator/SourceCoordinatorTestBase.java | 86 +++++++
.../coordinator/SplitAssignmentTrackerTest.java | 173 +++++++++++++
22 files changed, 2341 insertions(+), 71 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
index f899b12..4a048d1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.connector.source;
import org.apache.flink.annotation.Public;
import java.io.Serializable;
+import java.util.Objects;
/**
* A container class hosting the information of a {@link SourceReader}.
@@ -48,4 +49,18 @@ public final class ReaderInfo implements Serializable {
public String getLocation() {
return location;
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(subtaskId, location);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof ReaderInfo)) {
+ return false;
+ }
+ ReaderInfo other = (ReaderInfo) obj;
+ return subtaskId == other.subtaskId && location.equals(other.location);
+ }
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
index bdaee36..16b3938 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
@@ -66,8 +66,9 @@ public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extend
* Checkpoints the state of this split enumerator.
*
* @return an object containing the state of the split enumerator.
+ * @throws Exception when the snapshot cannot be taken.
*/
- CheckpointT snapshotState();
+ CheckpointT snapshotState() throws Exception;
/**
* Called to close the enumerator, in case it holds on to any resources, like threads or
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
index 0dd004a..db6ccad 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
@@ -48,11 +48,13 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> {
void sendEventToSourceReader(int subtaskId, SourceEvent event);
/**
- * Get the number of subtasks.
+ * Get the current parallelism of this Source. Note that due to auto-scaling, the parallelism
+ * may change over time. Therefore the SplitEnumerator should not cache the return value
+ * of this method, but always invoke this method to get the latest parallelism.
*
- * @return the number of subtasks.
+ * @return the parallelism of the Source.
*/
- int numSubtasks();
+ int currentParallelism();
/**
* Get the currently registered readers. The mapping is from subtask id to the reader info.
@@ -70,10 +72,12 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> {
/**
* Invoke the callable and handover the return value to the handler which will be executed
- * by the source coordinator.
+ * by the source coordinator. When this method is invoked multiple times, The <code>Coallble</code>s
+ * may be executed in a thread pool concurrently.
*
- * <p>It is important to make sure that the callable should not modify
- * any shared state. Otherwise the there might be unexpected behavior.
+ * <p>It is important to make sure that the callable does not modify any shared state, especially
+ * the states that will be a part of the {@link SplitEnumerator#snapshotState()}. Otherwise the
+ * there might be unexpected behavior.
*
* @param callable a callable to call.
* @param handler a handler that handles the return value of or the exception thrown from the callable.
@@ -81,11 +85,13 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> {
<T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler);
/**
- * Invoke the callable and handover the return value to the handler which will be executed
- * by the source coordinator.
+ * Invoke the given callable periodically and handover the return value to the handler which will
+ * be executed by the source coordinator. When this method is invoked multiple times, The
+ * <code>Coallble</code>s may be executed in a thread pool concurrently.
*
- * <p>It is important to make sure that the callable should not modify
- * any shared state. Otherwise the there might be unexpected behavior.
+ * <p>It is important to make sure that the callable does not modify any shared state, especially
+ * the states that will be a part of the {@link SplitEnumerator#snapshotState()}. Otherwise the
+ * there might be unexpected behavior.
*
* @param callable the callable to call.
* @param handler a handler that handles the return value of or the exception thrown from the callable.
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
new file mode 100644
index 0000000..f38ca60
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
@@ -0,0 +1,75 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.mocks;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Set;
+
+/**
+ * A mock {@link Source} for unit tests.
+ */
+public class MockSource implements Source<Integer, MockSourceSplit, Set<MockSourceSplit>> {
+ private final Boundedness boundedness;
+ private final int numSplits;
+
+ public MockSource(Boundedness boundedness, int numSplits) {
+ this.boundedness = boundedness;
+ this.numSplits = numSplits;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return boundedness;
+ }
+
+ @Override
+ public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> createEnumerator(SplitEnumeratorContext<MockSourceSplit> enumContext) {
+ return new MockSplitEnumerator(numSplits, enumContext);
+ }
+
+ @Override
+ public SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> restoreEnumerator(
+ SplitEnumeratorContext<MockSourceSplit> enumContext,
+ Set<MockSourceSplit> checkpoint) throws IOException {
+ return new MockSplitEnumerator(checkpoint, enumContext);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<MockSourceSplit> getSplitSerializer() {
+ return new MockSourceSplitSerializer();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<Set<MockSourceSplit>> getEnumeratorCheckpointSerializer() {
+ return new MockSplitEnumeratorCheckpointSerializer();
+ }
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
index 08a7ed8..9dbc9de 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
@@ -90,7 +90,7 @@ public class MockSourceSplit implements SourceSplit, Serializable {
@Override
public int hashCode() {
- return Objects.hash(id, records.toArray(new Integer[0]), endIndex, index);
+ return Objects.hash(id, Arrays.hashCode(records.toArray(new Integer[0])), endIndex, index);
}
@Override
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
new file mode 100644
index 0000000..64ec8af
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
@@ -0,0 +1,139 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.mocks;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * A mock {@link SplitEnumerator} for unit tests.
+ */
+public class MockSplitEnumerator implements SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> {
+ private SortedSet<MockSourceSplit> unassignedSplits;
+ private SplitEnumeratorContext<MockSourceSplit> enumContext;
+ private List<SourceEvent> handledSourceEvent;
+ private boolean started;
+ private boolean closed;
+
+ public MockSplitEnumerator(int numSplits, SplitEnumeratorContext<MockSourceSplit> enumContext) {
+ this(new HashSet<>(), enumContext);
+ for (int i = 0; i < numSplits; i++) {
+ unassignedSplits.add(new MockSourceSplit(i));
+ }
+ }
+
+ public MockSplitEnumerator(
+ Set<MockSourceSplit> unassignedSplits,
+ SplitEnumeratorContext<MockSourceSplit> enumContext) {
+ this.unassignedSplits = new TreeSet<>(Comparator.comparingInt(o -> Integer.parseInt(o.splitId())));
+ this.unassignedSplits.addAll(unassignedSplits);
+ this.enumContext = enumContext;
+ this.handledSourceEvent = new ArrayList<>();
+ this.started = false;
+ this.closed = false;
+ }
+
+ @Override
+ public void start() {
+ this.started = true;
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ handledSourceEvent.add(sourceEvent);
+ }
+
+ @Override
+ public void addSplitsBack(List<MockSourceSplit> splits, int subtaskId) {
+ unassignedSplits.addAll(splits);
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ List<MockSourceSplit> assignment = new ArrayList<>();
+ for (MockSourceSplit split : unassignedSplits) {
+ if (Integer.parseInt(split.splitId()) % enumContext.currentParallelism() == subtaskId) {
+ assignment.add(split);
+ }
+ }
+ enumContext.assignSplits(new SplitsAssignment<>(Collections.singletonMap(subtaskId, assignment)));
+ unassignedSplits.removeAll(assignment);
+ }
+
+ @Override
+ public Set<MockSourceSplit> snapshotState() {
+ return unassignedSplits;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.closed = true;
+ }
+
+ public void addNewSplits(List<MockSourceSplit> newSplits) {
+ unassignedSplits.addAll(newSplits);
+ assignAllSplits();
+ }
+
+ // --------------------
+
+ public boolean started() {
+ return started;
+ }
+
+ public boolean closed() {
+ return closed;
+ }
+
+ public Set<MockSourceSplit> getUnassignedSplits() {
+ return unassignedSplits;
+ }
+
+ public List<SourceEvent> getHandledSourceEvent() {
+ return handledSourceEvent;
+ }
+
+ // --------------------
+
+ private void assignAllSplits() {
+ Map<Integer, List<MockSourceSplit>> assignment = new HashMap<>();
+ unassignedSplits.forEach(split -> {
+ int subtaskId = Integer.parseInt(split.splitId()) % enumContext.currentParallelism();
+ if (enumContext.registeredReaders().containsKey(subtaskId)) {
+ assignment.computeIfAbsent(subtaskId, ignored -> new ArrayList<>()).add(split);
+ }
+ });
+ enumContext.assignSplits(new SplitsAssignment<>(assignment));
+ assignment.values().forEach(l -> unassignedSplits.removeAll(l));
+ }
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorCheckpointSerializer.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorCheckpointSerializer.java
new file mode 100644
index 0000000..4879b18
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorCheckpointSerializer.java
@@ -0,0 +1,55 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.mocks;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Serializer for the checkpoint of {@link MockSplitEnumerator}.
+ */
+public class MockSplitEnumeratorCheckpointSerializer implements SimpleVersionedSerializer<Set<MockSourceSplit>> {
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(Set<MockSourceSplit> obj) throws IOException {
+ return InstantiationUtil.serializeObject(new ArrayList<>(obj));
+ }
+
+ @Override
+ public Set<MockSourceSplit> deserialize(int version, byte[] serialized) throws IOException {
+ try {
+ ArrayList<MockSourceSplit> list = InstantiationUtil.deserializeObject(serialized, getClass().getClassLoader());
+ return new HashSet<>(list);
+ } catch (ClassNotFoundException e) {
+ throw new FlinkRuntimeException("Failed to deserialize the enumerator checkpoint.");
+ }
+ }
+
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
new file mode 100644
index 0000000..e730a15
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
@@ -0,0 +1,137 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+/**
+ * This class is used to coordinate between two components, where one component has an
+ * executor following the mailbox model and the other component notifies it when needed.
+ */
+public class ExecutorNotifier {
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutorNotifier.class);
+ private ScheduledExecutorService workerExecutor;
+ private Executor executorToNotify;
+
+ public ExecutorNotifier(ScheduledExecutorService workerExecutor,
+ Executor executorToNotify) {
+ this.executorToNotify = executorToNotify;
+ this.workerExecutor = workerExecutor;
+ }
+
+ /**
+ * Call the given callable once. Notify the {@link #executorToNotify} to execute
+ * the handler.
+ *
+ * <p>Note that when this method is invoked multiple times, it is possible that
+ * multiple callables are executed concurrently, so do the handlers. For example,
+ * assuming both the workerExecutor and executorToNotify are single threaded.
+ * The following code may still throw a <code>ConcurrentModificationException</code>.
+ *
+ * <pre>{@code
+ * final List<Integer> list = new ArrayList<>();
+ *
+ * // The callable adds an integer 1 to the list, while it works at the first glance,
+ * // A ConcurrentModificationException may be thrown because the caller and
+ * // handler may modify the list at the same time.
+ * notifier.notifyReadyAsync(
+ * () -> list.add(1),
+ * (ignoredValue, ignoredThrowable) -> list.add(2));
+ * }</pre>
+ *
+ * <p>Instead, the above logic should be implemented in as:
+ * <pre>{@code
+ * // Modify the state in the handler.
+ * notifier.notifyReadyAsync(() -> 1, (v, ignoredThrowable) -> {
+ * list.add(v));
+ * list.add(2);
+ * });
+ * }</pre>
+ *
+ * @param callable the callable to invoke before notifying the executor.
+ * @param handler the handler to handle the result of the callable.
+ */
+ public <T> void notifyReadyAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) {
+ workerExecutor.execute(() -> {
+ try {
+ T result = callable.call();
+ executorToNotify.execute(() -> handler.accept(result, null));
+ } catch (Throwable t) {
+ LOG.error("Unexpected exception {}", t);
+ handler.accept(null, t);
+ }
+ });
+ }
+
+ /**
+ * Call the given callable once. Notify the {@link #executorToNotify} to execute
+ * the handler.
+ *
+ * <p>Note that when this method is invoked multiple times, it is possible that
+ * multiple callables are executed concurrently, so do the handlers. For example,
+ * assuming both the workerExecutor and executorToNotify are single threaded.
+ * The following code may still throw a <code>ConcurrentModificationException</code>.
+ *
+ * <pre>{@code
+ * final List<Integer> list = new ArrayList<>();
+ *
+ * // The callable adds an integer 1 to the list, while it works at the first glance,
+ * // A ConcurrentModificationException may be thrown because the caller and
+ * // handler may modify the list at the same time.
+ * notifier.notifyReadyAsync(
+ * () -> list.add(1),
+ * (ignoredValue, ignoredThrowable) -> list.add(2));
+ * }</pre>
+ *
+ * <p>Instead, the above logic should be implemented in as:
+ * <pre>{@code
+ * // Modify the state in the handler.
+ * notifier.notifyReadyAsync(() -> 1, (v, ignoredThrowable) -> {
+ * list.add(v));
+ * list.add(2);
+ * });
+ * }</pre>
+ *
+ * @param callable the callable to execute before notifying the executor to notify.
+ * @param handler the handler that handles the result from the callable.
+ * @param initialDelayMs the initial delay in ms before invoking the given callable.
+ * @param periodMs the interval in ms to invoke the callable.
+ */
+ public <T> void notifyReadyAsync(
+ Callable<T> callable,
+ BiConsumer<T, Throwable> handler,
+ long initialDelayMs,
+ long periodMs) {
+ workerExecutor.scheduleAtFixedRate(() -> {
+ try {
+ T result = callable.call();
+ executorToNotify.execute(() -> handler.accept(result, null));
+ } catch (Throwable t) {
+ handler.accept(null, t);
+ }
+ }, initialDelayMs, periodMs, TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
new file mode 100644
index 0000000..4fc2aeb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -0,0 +1,278 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readAndVerifyCoordinatorSerdeVersion;
+import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readBytes;
+import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.writeCoordinatorSerdeVersion;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the {@link Source}.
+ *
+ * <p>The <code>SourceCoordinator</code> provides an event loop style thread model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * <p>The coordinator maintains a {@link org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements OperatorCoordinator {
+ private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinator.class);
+ /** The name of the operator this SourceCoordinator is associated with. */
+ private final String operatorName;
+ /** A single-thread executor to handle all the changes to the coordinator. */
+ private final ExecutorService coordinatorExecutor;
+ /** The Source that is associated with this SourceCoordinator. */
+ private final Source<?, SplitT, EnumChkT> source;
+ /** The serializer that handles the serde of the SplitEnumerator checkpoints. */
+ private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
+ /** The serializer for the SourceSplit of the associated Source. */
+ private final SimpleVersionedSerializer<SplitT> splitSerializer;
+ /** The context containing the states of the coordinator. */
+ private final SourceCoordinatorContext<SplitT> context;
+ /** The split enumerator created from the associated Source. */
+ private SplitEnumerator<SplitT, EnumChkT> enumerator;
+ /** A flag marking whether the coordinator has started. */
+ private boolean started;
+
+ public SourceCoordinator(
+ String operatorName,
+ ExecutorService coordinatorExecutor,
+ Source<?, SplitT, EnumChkT> source,
+ SourceCoordinatorContext<SplitT> context) {
+ this.operatorName = operatorName;
+ this.coordinatorExecutor = coordinatorExecutor;
+ this.source = source;
+ this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
+ this.splitSerializer = source.getSplitSerializer();
+ this.context = context;
+ this.enumerator = source.createEnumerator(context);
+ this.started = false;
+ }
+
+ @Override
+ public void start() throws Exception {
+ LOG.info("Starting split enumerator for source {}.", operatorName);
+ enumerator.start();
+ started = true;
+ }
+
+ @Override
+ public void close() throws Exception {
+ LOG.info("Closing SourceCoordinator for source {}.", operatorName);
+ boolean successfullyClosed = false;
+ try {
+ if (started) {
+ enumerator.close();
+ }
+ } finally {
+ coordinatorExecutor.shutdownNow();
+ // We do not expect this to actually block for long. At this point, there should be very few task running
+ // in the executor, if any.
+ successfullyClosed = coordinatorExecutor.awaitTermination(10, TimeUnit.SECONDS);
+ }
+ if (!successfullyClosed) {
+ throw new TimeoutException("The source coordinator failed to close before timeout.");
+ }
+ LOG.info("Source coordinator for source {} closed.", operatorName);
+ }
+
+ @Override
+ public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
+ ensureStarted();
+ coordinatorExecutor.execute(() -> {
+ try {
+ LOG.debug("Handling event from subtask {} of source {}: {}", subtask, operatorName, event);
+ if (event instanceof SourceEventWrapper) {
+ enumerator.handleSourceEvent(subtask, ((SourceEventWrapper) event).getSourceEvent());
+ } else if (event instanceof ReaderRegistrationEvent) {
+ handleReaderRegistrationEvent((ReaderRegistrationEvent) event);
+ }
+ } catch (Exception e) {
+ LOG.error("Failing the job due to exception when handling operator event {} from subtask {} " +
+ "of source {}.", event, subtask, operatorName, e);
+ context.failJob(e);
+ }
+ });
+ }
+
+ @Override
+ public void subtaskFailed(int subtaskId) {
+ ensureStarted();
+ coordinatorExecutor.execute(() -> {
+ try {
+ LOG.info("Handling subtask {} failure of source {}.", subtaskId, operatorName);
+ List<SplitT> splitsToAddBack = context.getAndRemoveUncheckpointedAssignment(subtaskId);
+ context.unregisterSourceReader(subtaskId);
+ LOG.debug("Adding {} back to the split enumerator of source {}.", splitsToAddBack, operatorName);
+ enumerator.addSplitsBack(splitsToAddBack, subtaskId);
+ } catch (Exception e) {
+ LOG.error("Failing the job due to exception when handling subtask {} failure in source {}.",
+ subtaskId, operatorName, e);
+ context.failJob(e);
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+ ensureStarted();
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ LOG.debug("Taking a state snapshot on operator {} for checkpoint {}", operatorName, checkpointId);
+ return toBytes(checkpointId);
+ } catch (Exception e) {
+ throw new CompletionException(
+ String.format("Failed to checkpoint coordinator for source %s due to ", operatorName), e);
+ }
+ }, coordinatorExecutor);
+ }
+
+ @Override
+ public void checkpointComplete(long checkpointId) {
+ ensureStarted();
+ coordinatorExecutor.execute(() -> {
+ try {
+ LOG.info("Marking checkpoint {} as completed for source {}.", checkpointId, operatorName);
+ context.onCheckpointComplete(checkpointId);
+ } catch (Exception e) {
+ LOG.error("Failing the job due to exception when completing the checkpoint {} for source {}.",
+ checkpointId, operatorName, e);
+ context.failJob(e);
+ }
+ });
+ }
+
+ @Override
+ public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+ if (started) {
+ throw new IllegalStateException(String.format(
+ "The coordinator for source %s has started. The source coordinator state can " +
+ "only be reset to a checkpoint before it starts.", operatorName));
+ }
+ LOG.info("Resetting coordinator of source {} from checkpoint.", operatorName);
+ if (started) {
+ enumerator.close();
+ }
+ LOG.info("Resetting SourceCoordinator from checkpoint.");
+ fromBytes(checkpointData);
+ }
+
+ // ---------------------------------------------------
+ @VisibleForTesting
+ SplitEnumerator<SplitT, EnumChkT> getEnumerator() {
+ return enumerator;
+ }
+
+ @VisibleForTesting
+ SourceCoordinatorContext<SplitT> getContext() {
+ return context;
+ }
+
+ // --------------------- Serde -----------------------
+
+ /**
+ * Serialize the coordinator state. The current implementation may not be super efficient,
+ * but it should not matter that much because most of the state should be rather small.
+ * Large states themselves may already be a problem regardless of how the serialization
+ * is implemented.
+ *
+ * @return A byte array containing the serialized state of the source coordinator.
+ * @throws Exception When something goes wrong in serialization.
+ */
+ private byte[] toBytes(long checkpointId) throws Exception {
+ EnumChkT enumCkpt = enumerator.snapshotState();
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputViewStreamWrapper(baos)) {
+ writeCoordinatorSerdeVersion(out);
+ out.writeInt(enumCheckpointSerializer.getVersion());
+ byte[] serialziedEnumChkpt = enumCheckpointSerializer.serialize(enumCkpt);
+ out.writeInt(serialziedEnumChkpt.length);
+ out.write(serialziedEnumChkpt);
+ context.snapshotState(checkpointId, splitSerializer, out);
+ out.flush();
+ return baos.toByteArray();
+ }
+ }
+
+ /**
+ * Restore the state of this source coordinator from the state bytes.
+ *
+ * @param bytes The checkpoint bytes that was returned from {@link #toBytes(long)}
+ * @throws Exception When the deserialization failed.
+ */
+ private void fromBytes(byte[] bytes) throws Exception {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ DataInputStream in = new DataInputViewStreamWrapper(bais)) {
+ readAndVerifyCoordinatorSerdeVersion(in);
+ int enumSerializerVersion = in.readInt();
+ int serializedEnumChkptSize = in.readInt();
+ byte[] serializedEnumChkpt = readBytes(in, serializedEnumChkptSize);
+ EnumChkT enumChkpt = enumCheckpointSerializer.deserialize(enumSerializerVersion, serializedEnumChkpt);
+ context.restoreState(splitSerializer, in);
+ enumerator = source.restoreEnumerator(context, enumChkpt);
+ }
+ }
+
+ // --------------------- private methods -------------
+
+ private void handleReaderRegistrationEvent(ReaderRegistrationEvent event) {
+ context.registerSourceReader(new ReaderInfo(event.subtaskId(), event.location()));
+ enumerator.addReader(event.subtaskId());
+ }
+
+ private void ensureStarted() {
+ if (!started) {
+ throw new IllegalStateException("The coordinator has not started yet.");
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
new file mode 100644
index 0000000..47fe564
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -0,0 +1,274 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readRegisteredReaders;
+import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.writeRegisteredReaders;
+
+/**
+ * A context class for the {@link OperatorCoordinator}. Compared with {@link SplitEnumeratorContext} this class
+ * allows interaction with state and sending {@link OperatorEvent} to the SourceOperator while
+ * {@link SplitEnumeratorContext} only allows sending {@link SourceEvent}.
+ *
+ * <p>The context serves a few purposes:
+ * <ul>
+ * <li>
+ * Information provider - The context provides necessary information to the enumerator for it to
+ * know what is the status of the source readers and their split assignments. These information
+ * allows the split enumerator to do the coordination.
+ * </li>
+ * <li>
+ * Action taker - The context also provides a few actions that the enumerator can take to carry
+ * out the coordination. So far there are two actions: 1) assign splits to the source readers.
+ * and 2) sens a custom {@link SourceEvent SourceEvents} to the source readers.
+ * </li>
+ * <li>
+ * Thread model enforcement - The context ensures that all the manipulations to the coordinator state
+ * are handled by the same thread.
+ * </li>
+ * </ul>
+ * @param <SplitT> the type of the splits.
+ */
+@Internal
+public class SourceCoordinatorContext<SplitT extends SourceSplit> implements SplitEnumeratorContext<SplitT> {
+ private final ExecutorService coordinatorExecutor;
+ private final ExecutorNotifier notifier;
+ private final OperatorCoordinator.Context operatorCoordinatorContext;
+ private final ConcurrentMap<Integer, ReaderInfo> registeredReaders;
+ private final SplitAssignmentTracker<SplitT> assignmentTracker;
+ private final SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory;
+ private final String coordinatorThreadName;
+
+ public SourceCoordinatorContext(
+ ExecutorService coordinatorExecutor,
+ SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
+ int numWorkerThreads,
+ OperatorCoordinator.Context operatorCoordinatorContext) {
+ this(coordinatorExecutor, coordinatorThreadFactory, numWorkerThreads, operatorCoordinatorContext,
+ new SplitAssignmentTracker<>());
+ }
+
+ // Package private method for unit test.
+ SourceCoordinatorContext(
+ ExecutorService coordinatorExecutor,
+ SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
+ int numWorkerThreads,
+ OperatorCoordinator.Context operatorCoordinatorContext,
+ SplitAssignmentTracker<SplitT> splitAssignmentTracker) {
+ this.coordinatorExecutor = coordinatorExecutor;
+ this.coordinatorThreadFactory = coordinatorThreadFactory;
+ this.operatorCoordinatorContext = operatorCoordinatorContext;
+ this.registeredReaders = new ConcurrentHashMap<>();
+ this.assignmentTracker = splitAssignmentTracker;
+ this.coordinatorThreadName = coordinatorThreadFactory.getCoordinatorThreadName();
+ this.notifier = new ExecutorNotifier(
+ Executors.newScheduledThreadPool(numWorkerThreads, new ThreadFactory() {
+ private int index = 0;
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, coordinatorThreadName + "-worker-" + index++);
+ }
+ }),
+ coordinatorExecutor);
+ }
+
+ @Override
+ public MetricGroup metricGroup() {
+ return null;
+ }
+
+ @Override
+ public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
+ try {
+ operatorCoordinatorContext.sendEvent(new SourceEventWrapper(event), subtaskId);
+ } catch (TaskNotRunningException e) {
+ throw new FlinkRuntimeException(String.format("Failed to send event %s to subtask %d",
+ event,
+ subtaskId), e);
+ }
+ }
+
+ @Override
+ public int currentParallelism() {
+ return operatorCoordinatorContext.currentParallelism();
+ }
+
+ @Override
+ public Map<Integer, ReaderInfo> registeredReaders() {
+ return Collections.unmodifiableMap(registeredReaders);
+ }
+
+ @Override
+ public void assignSplits(SplitsAssignment<SplitT> assignment) {
+ // Ensure the split assignment is done by the the coordinator executor.
+ if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+ try {
+ coordinatorExecutor.submit(() -> assignSplits(assignment)).get();
+ return;
+ } catch (InterruptedException | ExecutionException e) {
+ throw new FlinkRuntimeException("Failed to assign splits due to", e);
+ }
+ }
+
+ // Ensure all the subtasks in the assignment have registered.
+ for (Integer subtaskId : assignment.assignment().keySet()) {
+ if (!registeredReaders.containsKey(subtaskId)) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot assign splits %s to subtask %d because the subtask is not registered.",
+ registeredReaders.get(subtaskId), subtaskId));
+ }
+ }
+
+ assignmentTracker.recordSplitAssignment(assignment);
+ assignment.assignment().forEach(
+ (id, splits) -> {
+ try {
+ operatorCoordinatorContext.sendEvent(new AddSplitEvent<>(splits), id);
+ } catch (TaskNotRunningException e) {
+ throw new FlinkRuntimeException(String.format(
+ "Failed to assign splits %s to reader %d.", splits, id), e);
+ }
+ });
+ }
+
+ @Override
+ public <T> void callAsync(
+ Callable<T> callable,
+ BiConsumer<T, Throwable> handler,
+ long initialDelay,
+ long period) {
+ notifier.notifyReadyAsync(callable, handler, initialDelay, period);
+ }
+
+ @Override
+ public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) {
+ notifier.notifyReadyAsync(callable, handler);
+ }
+
+ // --------- Package private additional methods for the SourceCoordinator ------------
+
+ /**
+ * Fail the job with the given cause.
+ *
+ * @param cause the cause of the job failure.
+ */
+ void failJob(Throwable cause) {
+ operatorCoordinatorContext.failJob(cause);
+ }
+
+ /**
+ * Take a snapshot of this SourceCoordinatorContext.
+ *
+ * @param checkpointId The id of the ongoing checkpoint.
+ * @param splitSerializer The serializer of the splits.
+ * @param out An ObjectOutput that can be used to
+ */
+ void snapshotState(
+ long checkpointId,
+ SimpleVersionedSerializer<SplitT> splitSerializer,
+ DataOutputStream out) throws Exception {
+ writeRegisteredReaders(registeredReaders, out);
+ assignmentTracker.snapshotState(checkpointId, splitSerializer, out);
+ }
+
+ /**
+ * Restore the state of the context.
+ * @param splitSerializer the serializer for the SourceSplits.
+ * @param in the input from which the states are read.
+ * @throws Exception when the restoration failed.
+ */
+ @SuppressWarnings("unchecked")
+ void restoreState(
+ SimpleVersionedSerializer<SplitT> splitSerializer,
+ DataInputStream in) throws Exception {
+ Map<Integer, ReaderInfo> readers = readRegisteredReaders(in);
+ registeredReaders.clear();
+ registeredReaders.putAll(readers);
+ assignmentTracker.restoreState(splitSerializer, in);
+ }
+
+ /**
+ * Register a source reader.
+ *
+ * @param readerInfo the reader information of the source reader.
+ */
+ void registerSourceReader(ReaderInfo readerInfo) {
+ registeredReaders.put(readerInfo.getSubtaskId(), readerInfo);
+ }
+
+ /**
+ * Unregister a source reader.
+ *
+ * @param subtaskId the subtask id of the source reader.
+ */
+ void unregisterSourceReader(int subtaskId) {
+ Preconditions.checkNotNull(registeredReaders.remove(subtaskId), String.format(
+ "Failed to unregister source reader of id %s because it is not registered.", subtaskId));
+ }
+
+ /**
+ * Get the split to put back. This only happens when a source reader subtask has failed.
+ *
+ * @param failedSubtaskId the failed subtask id.
+ * @return A list of splits that needs to be added back to the {@link SplitEnumerator}.
+ */
+ List<SplitT> getAndRemoveUncheckpointedAssignment(int failedSubtaskId) {
+ return assignmentTracker.getAndRemoveUncheckpointedAssignment(failedSubtaskId);
+ }
+
+ /**
+ * Invoked when a successful checkpoint has been taken.
+ *
+ * @param checkpointId the id of the successful checkpoint.
+ */
+ void onCheckpointComplete(long checkpointId) {
+ assignmentTracker.onCheckpointComplete(checkpointId);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
new file mode 100644
index 0000000..77ad7a3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -0,0 +1,112 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.function.BiConsumer;
+
+/**
+ * The provider of {@link SourceCoordinator}.
+ */
+public class SourceCoordinatorProvider<SplitT extends SourceSplit>
+ implements OperatorCoordinator.Provider {
+ private final String operatorName;
+ private final OperatorID operatorID;
+ private final Source<?, SplitT, ?> source;
+ private final int numWorkerThreads;
+
+ /**
+ * Construct the {@link SourceCoordinatorProvider}.
+ *
+ * @param operatorName the name of the operator.
+ * @param operatorID the ID of the operator this coordinator corresponds to.
+ * @param source the Source that will be used for this coordinator.
+ * @param numWorkerThreads the number of threads the should provide to the SplitEnumerator
+ * for doing async calls. See
+ * {@link org.apache.flink.api.connector.source.SplitEnumeratorContext#callAsync(Callable, BiConsumer)
+ * SplitEnumeratorContext.callAsync()}.
+ */
+ public SourceCoordinatorProvider(
+ String operatorName,
+ OperatorID operatorID,
+ Source<?, SplitT, ?> source,
+ int numWorkerThreads) {
+ this.operatorName = operatorName;
+ this.operatorID = operatorID;
+ this.source = source;
+ this.numWorkerThreads = numWorkerThreads;
+ }
+
+ @Override
+ public OperatorID getOperatorId() {
+ return operatorID;
+ }
+
+ @Override
+ public OperatorCoordinator create(OperatorCoordinator.Context context) {
+ final String coordinatorThreadName = "SourceCoordinator-" + operatorName;
+ CoordinatorExecutorThreadFactory coordinatorThreadFactory =
+ new CoordinatorExecutorThreadFactory(coordinatorThreadName);
+ ExecutorService coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+ SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
+ new SourceCoordinatorContext<>(coordinatorExecutor, coordinatorThreadFactory, numWorkerThreads, context);
+ return new SourceCoordinator<>(operatorName, coordinatorExecutor, source, sourceCoordinatorContext);
+ }
+
+ /**
+ * A thread factory class that provides some helper methods.
+ */
+ public static class CoordinatorExecutorThreadFactory implements ThreadFactory {
+ private final String coordinatorThreadName;
+ private Thread t;
+
+ CoordinatorExecutorThreadFactory(String coordinatorThreadName) {
+ this.coordinatorThreadName = coordinatorThreadName;
+ this.t = null;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ if (t != null) {
+ throw new IllegalStateException("Should never happen. This factory should only be used by a " +
+ "SingleThreadExecutor.");
+ }
+ t = new Thread(r, coordinatorThreadName);
+ t.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
+ return t;
+ }
+
+ String getCoordinatorThreadName() {
+ return coordinatorThreadName;
+ }
+
+ boolean isCurrentThreadCoordinatorThread() {
+ return Thread.currentThread() == t;
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java
new file mode 100644
index 0000000..2855b2a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java
@@ -0,0 +1,179 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+
+/**
+ * A serialization util class for the {@link SourceCoordinator}.
+ */
+public class SourceCoordinatorSerdeUtils {
+ /** The current source coordinator serde version. */
+ private static final int CURRENT_VERSION = 0;
+
+ /** Private constructor for utility class. */
+ private SourceCoordinatorSerdeUtils() {}
+
+ /** Write the current serde version. */
+ static void writeCoordinatorSerdeVersion(DataOutputStream out) throws IOException {
+ out.writeInt(CURRENT_VERSION);
+ }
+
+ /** Read and verify the serde version. */
+ static void readAndVerifyCoordinatorSerdeVersion(DataInputStream in) throws IOException {
+ int version = in.readInt();
+ if (version > CURRENT_VERSION) {
+ throw new IOException("Unsupported source coordinator serde version " + version);
+ }
+ }
+
+ /**
+ * Get serialized size of the registered readers map.
+ *
+ * <p>The binary format is following:
+ * 4 Bytes - num entries.
+ * N Bytes - entries
+ * 4 Bytes - subtask id
+ * N Bytes - reader info, see {@link #writeReaderInfo(ReaderInfo, DataOutputStream)}.
+ */
+ static void writeRegisteredReaders(Map<Integer, ReaderInfo> registeredReaders, DataOutputStream out) throws IOException {
+ out.writeInt(registeredReaders.size());
+ for (ReaderInfo info : registeredReaders.values()) {
+ writeReaderInfo(info, out);
+ }
+ }
+
+ static Map<Integer, ReaderInfo> readRegisteredReaders(DataInputStream in) throws IOException {
+ int numReaders = in.readInt();
+ Map<Integer, ReaderInfo> registeredReaders = new HashMap<>();
+ for (int i = 0; i < numReaders; i++) {
+ ReaderInfo info = readReaderInfo(in);
+ registeredReaders.put(info.getSubtaskId(), info);
+ }
+ return registeredReaders;
+ }
+
+ /**
+ * Serialize the assignment by checkpoint ids.
+ */
+ static <SplitT> void writeAssignmentsByCheckpointId(
+ Map<Long, Map<Integer, LinkedHashSet<SplitT>>> assignmentByCheckpointIds,
+ SimpleVersionedSerializer<SplitT> splitSerializer,
+ DataOutputStream out) throws IOException {
+ // SplitSerializer version.
+ out.writeInt(splitSerializer.getVersion());
+ // Num checkpoints.
+ out.writeInt(assignmentByCheckpointIds.size());
+ for (Map.Entry<Long, Map<Integer, LinkedHashSet<SplitT>>> assignments : assignmentByCheckpointIds.entrySet()) {
+ long checkpointId = assignments.getKey();
+ out.writeLong(checkpointId);
+
+ int numSubtasks = assignments.getValue().size();
+ out.writeInt(numSubtasks);
+ for (Map.Entry<Integer, LinkedHashSet<SplitT>> assignment : assignments.getValue().entrySet()) {
+ int subtaskId = assignment.getKey();
+ out.writeInt(subtaskId);
+
+ int numAssignedSplits = assignment.getValue().size();
+ out.writeInt(numAssignedSplits);
+ for (SplitT split : assignment.getValue()) {
+ byte[] serializedSplit = splitSerializer.serialize(split);
+ out.writeInt(serializedSplit.length);
+ out.write(serializedSplit);
+ }
+ }
+ }
+ }
+
+ /**
+ * Deserialize the assignment by checkpoint ids.
+ */
+ static <SplitT> Map<Long, Map<Integer, LinkedHashSet<SplitT>>> readAssignmentsByCheckpointId(
+ DataInputStream in,
+ SimpleVersionedSerializer<SplitT> splitSerializer) throws IOException {
+ int splitSerializerVersion = in.readInt();
+ int numCheckpoints = in.readInt();
+ Map<Long, Map<Integer, LinkedHashSet<SplitT>>> assignmentsByCheckpointIds = new HashMap<>(numCheckpoints);
+ for (int i = 0; i < numCheckpoints; i++) {
+ long checkpointId = in.readLong();
+ int numSubtasks = in.readInt();
+ Map<Integer, LinkedHashSet<SplitT>> assignments = new HashMap<>();
+ assignmentsByCheckpointIds.put(checkpointId, assignments);
+ for (int j = 0; j < numSubtasks; j++) {
+ int subtaskId = in.readInt();
+ int numAssignedSplits = in.readInt();
+ LinkedHashSet<SplitT> splits = new LinkedHashSet<>(numAssignedSplits);
+ assignments.put(subtaskId, splits);
+ for (int k = 0; k < numAssignedSplits; k++) {
+ int serializedSplitSize = in.readInt();
+ byte[] serializedSplit = readBytes(in, serializedSplitSize);
+ SplitT split = splitSerializer.deserialize(splitSerializerVersion, serializedSplit);
+ splits.add(split);
+ }
+ }
+ }
+ return assignmentsByCheckpointIds;
+ }
+
+ static byte[] readBytes(DataInputStream in, int size) throws IOException {
+ byte[] bytes = new byte[size];
+ int off = 0;
+ // For ByteArrayInputStream the read should succeed with one shot.
+ while (off < size) {
+ int read = in.read(bytes, off, size - off);
+ if (read < 0) {
+ throw new BufferUnderflowException();
+ }
+ off += read;
+ }
+ return bytes;
+ }
+
+ // ----- private helper methods -----
+
+ /**
+ * Serialize {@link ReaderInfo}.
+ *
+ * <p>The binary format is following:
+ * 4 Bytes - subtask id
+ * N Bytes - location string
+ *
+ * @param readerInfo the given reader information to serialize.
+ */
+ private static void writeReaderInfo(ReaderInfo readerInfo, DataOutputStream out) throws IOException {
+ out.writeInt(readerInfo.getSubtaskId());
+ out.writeUTF(readerInfo.getLocation());
+ }
+
+ private static ReaderInfo readReaderInfo(DataInputStream in) throws IOException {
+ int subtaskId = in.readInt();
+ String location = in.readUTF();
+ return new ReaderInfo(subtaskId, location);
+ }
+
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java
new file mode 100644
index 0000000..7985f7d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java
@@ -0,0 +1,164 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readAssignmentsByCheckpointId;
+import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.writeAssignmentsByCheckpointId;
+
+/**
+ * A class that is responsible for tracking the past split assignments made by
+ * {@link SplitEnumerator}.
+ */
+@Internal
+public class SplitAssignmentTracker<SplitT extends SourceSplit> {
+ // All the split assignments since the last successful checkpoint.
+ // Maintaining this allow the subtasks to fail over independently.
+ // The mapping is [CheckpointId -> [SubtaskId -> LinkedHashSet[SourceSplits]]].
+ private final SortedMap<Long, Map<Integer, LinkedHashSet<SplitT>>> assignmentsByCheckpointId;
+ // The split assignments since the last checkpoint attempt.
+ // The mapping is [SubtaskId -> LinkedHashSet[SourceSplits]].
+ private Map<Integer, LinkedHashSet<SplitT>> uncheckpointedAssignments;
+
+ public SplitAssignmentTracker() {
+ this.assignmentsByCheckpointId = new TreeMap<>();
+ this.uncheckpointedAssignments = new HashMap<>();
+ }
+
+ /**
+ * Take a snapshot of the uncheckpointed split assignments.
+ *
+ * @param checkpointId the id of the ongoing checkpoint
+ */
+ public void snapshotState(
+ long checkpointId,
+ SimpleVersionedSerializer<SplitT> splitSerializer,
+ DataOutputStream out) throws Exception {
+ // Include the uncheckpointed assignments to the snapshot.
+ assignmentsByCheckpointId.put(checkpointId, uncheckpointedAssignments);
+ uncheckpointedAssignments = new HashMap<>();
+ writeAssignmentsByCheckpointId(assignmentsByCheckpointId, splitSerializer, out);
+ }
+
+ /**
+ * Restore the state of the SplitAssignmentTracker.
+ *
+ * @param splitSerializer The serializer of the splits.
+ * @param in The ObjectInput that contains the state of the SplitAssignmentTracker.
+ * @throws Exception when the state deserialization fails.
+ */
+ @SuppressWarnings("unchecked")
+ public void restoreState(SimpleVersionedSerializer<SplitT> splitSerializer, DataInputStream in) throws Exception {
+ // Read the split assignments by checkpoint id.
+ Map<Long, Map<Integer, LinkedHashSet<SplitT>>> deserializedAssignments =
+ readAssignmentsByCheckpointId(in, splitSerializer);
+ assignmentsByCheckpointId.putAll(deserializedAssignments);
+ }
+
+ /**
+ * when a checkpoint has been successfully made, this method is invoked to clean up the assignment
+ * history before this successful checkpoint.
+ *
+ * @param checkpointId the id of the successful checkpoint.
+ */
+ public void onCheckpointComplete(long checkpointId) {
+ assignmentsByCheckpointId.entrySet().removeIf(entry -> entry.getKey() <= checkpointId);
+ }
+
+ /**
+ * Record a new split assignment.
+ *
+ * @param splitsAssignment the new split assignment.
+ */
+ public void recordSplitAssignment(SplitsAssignment<SplitT> splitsAssignment) {
+ addSplitAssignment(splitsAssignment, uncheckpointedAssignments);
+ }
+
+ /**
+ * This method is invoked when a source reader fails over. In this case, the source reader will
+ * restore its split assignment to the last successful checkpoint. Any split assignment to that
+ * source reader after the last successful checkpoint will be lost on the source reader side as
+ * if those splits were never assigned. To handle this case, the coordinator needs to find those
+ * splits and return them back to the SplitEnumerator for re-assignment.
+ *
+ * @param failedSubtaskId the failed subtask id.
+ * @return A list of splits that needs to be added back to the {@link SplitEnumerator}.
+ */
+ public List<SplitT> getAndRemoveUncheckpointedAssignment(int failedSubtaskId) {
+ List<SplitT> splits = new ArrayList<>();
+ assignmentsByCheckpointId.values().forEach(assignments -> {
+ removeFromAssignment(failedSubtaskId, assignments, splits);
+ });
+ removeFromAssignment(failedSubtaskId, uncheckpointedAssignments, splits);
+ return splits;
+ }
+
+ // ------------- Methods visible for testing ----------------
+
+ @VisibleForTesting
+ SortedMap<Long, Map<Integer, LinkedHashSet<SplitT>>> assignmentsByCheckpointId() {
+ return assignmentsByCheckpointId;
+ }
+
+ @VisibleForTesting
+ Map<Integer, LinkedHashSet<SplitT>> assignmentsByCheckpointId(long checkpointId) {
+ return assignmentsByCheckpointId.get(checkpointId);
+ }
+
+ @VisibleForTesting
+ Map<Integer, LinkedHashSet<SplitT>> uncheckpointedAssignments() {
+ return uncheckpointedAssignments;
+ }
+
+ // -------------- private helpers ---------------
+
+ private void removeFromAssignment(
+ int subtaskId,
+ Map<Integer, LinkedHashSet<SplitT>> assignments,
+ List<SplitT> toPutBack) {
+ Set<SplitT> splitForSubtask = assignments.remove(subtaskId);
+ if (splitForSubtask != null) {
+ toPutBack.addAll(splitForSubtask);
+ }
+ }
+
+ private void addSplitAssignment(
+ SplitsAssignment<SplitT> additionalAssignment,
+ Map<Integer, LinkedHashSet<SplitT>> assignments) {
+ additionalAssignment.assignment().forEach((id, splits) ->
+ assignments.computeIfAbsent(id, ignored -> new LinkedHashSet<>()).addAll(splits));
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java
similarity index 54%
copy from flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java
index f899b12..5f6cabd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java
@@ -16,36 +16,30 @@
limitations under the License.
*/
-package org.apache.flink.api.connector.source;
+package org.apache.flink.runtime.source.event;
-import org.apache.flink.annotation.Public;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import java.io.Serializable;
+import java.util.List;
/**
- * A container class hosting the information of a {@link SourceReader}.
+ * A source event that adds splits to a source reader.
+ *
+ * @param <SplitT> the type of splits.
*/
-@Public
-public final class ReaderInfo implements Serializable {
- private final int subtaskId;
- private final String location;
-
- public ReaderInfo(int subtaskId, String location) {
- this.subtaskId = subtaskId;
- this.location = location;
+public class AddSplitEvent<SplitT> implements OperatorEvent {
+ private final List<SplitT> splits;
+
+ public AddSplitEvent(List<SplitT> splits) {
+ this.splits = splits;
}
- /**
- * @return the ID of the subtask that runs the source reader.
- */
- public int getSubtaskId() {
- return subtaskId;
+ public List<SplitT> splits() {
+ return splits;
}
- /**
- * @return the location of the subtask that runs this source reader.
- */
- public String getLocation() {
- return location;
+ @Override
+ public String toString() {
+ return String.format("AddSplitEvents[%s]", splits);
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
similarity index 61%
copy from flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
index f899b12..4cb2aeb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
@@ -16,36 +16,33 @@
limitations under the License.
*/
-package org.apache.flink.api.connector.source;
+package org.apache.flink.runtime.source.event;
-import org.apache.flink.annotation.Public;
-
-import java.io.Serializable;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
/**
- * A container class hosting the information of a {@link SourceReader}.
+ * An {@link OperatorEvent} that registers a {@link org.apache.flink.api.connector.source.SourceReader SourceReader}
+ * to the SourceCoordinator.
*/
-@Public
-public final class ReaderInfo implements Serializable {
+public class ReaderRegistrationEvent implements OperatorEvent {
private final int subtaskId;
private final String location;
- public ReaderInfo(int subtaskId, String location) {
+ public ReaderRegistrationEvent(int subtaskId, String location) {
this.subtaskId = subtaskId;
this.location = location;
}
- /**
- * @return the ID of the subtask that runs the source reader.
- */
- public int getSubtaskId() {
+ public int subtaskId() {
return subtaskId;
}
- /**
- * @return the location of the subtask that runs this source reader.
- */
- public String getLocation() {
+ public String location() {
return location;
}
+
+ @Override
+ public String toString() {
+ return String.format("ReaderRegistrationEvent[subtaskId = %d, location = %s)", subtaskId, location);
+ }
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/SourceEventWrapper.java
similarity index 54%
copy from flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/source/event/SourceEventWrapper.java
index f899b12..0a5ffa3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/SourceEventWrapper.java
@@ -16,36 +16,30 @@
limitations under the License.
*/
-package org.apache.flink.api.connector.source;
+package org.apache.flink.runtime.source.event;
-import org.apache.flink.annotation.Public;
-
-import java.io.Serializable;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
/**
- * A container class hosting the information of a {@link SourceReader}.
+ * A wrapper operator event that contains a custom defined operator event.
*/
-@Public
-public final class ReaderInfo implements Serializable {
- private final int subtaskId;
- private final String location;
-
- public ReaderInfo(int subtaskId, String location) {
- this.subtaskId = subtaskId;
- this.location = location;
+public class SourceEventWrapper implements OperatorEvent {
+ private final SourceEvent sourceEvent;
+
+ public SourceEventWrapper(SourceEvent sourceEvent) {
+ this.sourceEvent = sourceEvent;
}
/**
- * @return the ID of the subtask that runs the source reader.
+ * @return The {@link SourceEvent} in this SourceEventWrapper.
*/
- public int getSubtaskId() {
- return subtaskId;
+ public SourceEvent getSourceEvent() {
+ return sourceEvent;
}
- /**
- * @return the location of the subtask that runs this source reader.
- */
- public String getLocation() {
- return location;
+ @Override
+ public String toString() {
+ return String.format("SourceEventWrapper[%s]", sourceEvent);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
new file mode 100644
index 0000000..7928535
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
@@ -0,0 +1,105 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+public class MockOperatorCoordinatorContext implements OperatorCoordinator.Context {
+ private final OperatorID operatorID;
+ private final int numSubtasks;
+ private final boolean failEventSending;
+
+ private final Map<Integer, List<OperatorEvent>> eventsToOperator;
+ private final LinkedHashMap<Integer, Throwable> failedTasks;
+ private boolean jobFailed;
+
+ public MockOperatorCoordinatorContext(OperatorID operatorID, int numSubtasks) {
+ this(operatorID, numSubtasks, true);
+ }
+
+ public MockOperatorCoordinatorContext(OperatorID operatorID, int numSubtasks, boolean failEventSending) {
+ this.operatorID = operatorID;
+ this.numSubtasks = numSubtasks;
+ this.eventsToOperator = new HashMap<>();
+ this.failedTasks = new LinkedHashMap<>();
+ this.jobFailed = false;
+ this.failEventSending = failEventSending;
+ }
+
+ @Override
+ public OperatorID getOperatorId() {
+ return operatorID;
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> sendEvent(
+ OperatorEvent evt,
+ int targetSubtask) throws TaskNotRunningException {
+ eventsToOperator.computeIfAbsent(targetSubtask, subtaskId -> new ArrayList<>()).add(evt);
+ if (failEventSending) {
+ CompletableFuture<Acknowledge> future = new CompletableFuture<>();
+ future.completeExceptionally(new FlinkRuntimeException("Testing Exception to fail event sending."));
+ return future;
+ } else {
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
+ }
+
+ @Override
+ public void failTask(int subtask, Throwable cause) {
+ failedTasks.put(subtask, cause);
+ }
+
+ @Override
+ public void failJob(Throwable cause) {
+ jobFailed = true;
+ }
+
+ @Override
+ public int currentParallelism() {
+ return numSubtasks;
+ }
+
+ // -------------------------------
+
+ public List<OperatorEvent> getEventsToOperatorBySubtaskId(int subtaskId) {
+ return eventsToOperator.get(subtaskId);
+ }
+
+ public Map<Integer, List<OperatorEvent>> getEventsToOperator() {
+ return eventsToOperator;
+ }
+
+ public LinkedHashMap<Integer, Throwable> getFailedTasks() {
+ return failedTasks;
+ }
+
+ public boolean isJobFailed() {
+ return jobFailed;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java
new file mode 100644
index 0000000..c2d6fb8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java
@@ -0,0 +1,84 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.hamcrest.Matchers;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * A util class containing the helper methods for the coordinator tests.
+ */
+class CoordinatorTestUtils {
+
+ /**
+ * Create a SplitsAssignment. The assignments looks like following:
+ * Subtask 0: Splits {0}
+ * Subtask 1: Splits {1, 2}
+ * Subtask 2: Splits {3, 4, 5}
+ */
+ static SplitsAssignment<MockSourceSplit> getSplitsAssignment(int numSubtasks, int startingSplitId) {
+ Map<Integer, List<MockSourceSplit>> assignments = new HashMap<>();
+ int splitId = startingSplitId;
+ for (int subtaskIndex = 0; subtaskIndex < numSubtasks; subtaskIndex++) {
+ List<MockSourceSplit> subtaskAssignment = new ArrayList<>();
+ for (int j = 0; j < subtaskIndex + 1; j++) {
+ subtaskAssignment.add(new MockSourceSplit(splitId++));
+ }
+ assignments.put(subtaskIndex, subtaskAssignment);
+ }
+ return new SplitsAssignment<>(assignments);
+ }
+
+ /**
+ * Check the actual assignment meets the expectation.
+ */
+ static void verifyAssignment(List<String> expectedSplitIds, Collection<MockSourceSplit> actualAssignment) {
+ assertEquals(expectedSplitIds.size(), actualAssignment.size());
+ int i = 0;
+ for (MockSourceSplit split : actualAssignment) {
+ assertEquals(expectedSplitIds.get(i++), split.splitId());
+ }
+ }
+
+ static void verifyException(ThrowingRunnable<Throwable> runnable, String failureMessage, String errorMessage) {
+ try {
+ runnable.run();
+ fail(failureMessage);
+ } catch (Throwable t) {
+ Throwable rootCause = t;
+ while (rootCause.getCause() != null) {
+ rootCause = rootCause.getCause();
+ }
+ assertThat(rootCause.getMessage(), Matchers.startsWith(errorMessage));
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
new file mode 100644
index 0000000..ba73ca0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -0,0 +1,188 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.getSplitsAssignment;
+import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment;
+import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for {@link SourceCoordinatorContext}.
+ */
+public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
+
+ @Test
+ public void testRegisterReader() {
+ List<ReaderInfo> readerInfo = registerReaders();
+
+ assertTrue(context.registeredReaders().containsKey(0));
+ assertTrue(context.registeredReaders().containsKey(1));
+ assertEquals(readerInfo.get(0), context.registeredReaders().get(0));
+ assertEquals(readerInfo.get(1), context.registeredReaders().get(1));
+ }
+
+ @Test
+ public void testUnregisterReader() {
+ List<ReaderInfo> readerInfo = registerReaders();
+ assertEquals(readerInfo.get(0), context.registeredReaders().get(0));
+
+ context.unregisterSourceReader(0);
+ assertEquals("Only reader 2 should be registered.", 2, context.registeredReaders().size());
+ assertNull(context.registeredReaders().get(0));
+ assertEquals(readerInfo.get(1), context.registeredReaders().get(1));
+ assertEquals(readerInfo.get(2), context.registeredReaders().get(2));
+ }
+
+ @Test
+ public void testAssignSplitsFromCoordinatorExecutor() throws ExecutionException, InterruptedException {
+ testAssignSplits(true);
+ }
+
+ @Test
+ public void testAssignSplitsFromOtherThread() throws ExecutionException, InterruptedException {
+ testAssignSplits(false);
+ }
+
+ private void testAssignSplits(boolean fromCoordinatorExecutor) throws ExecutionException, InterruptedException {
+ // Register the readers.
+ registerReaders();
+
+ // Assign splits to the readers.
+ SplitsAssignment<MockSourceSplit> splitsAssignment = getSplitsAssignment(2, 0);
+ if (fromCoordinatorExecutor) {
+ coordinatorExecutor.submit(() -> context.assignSplits(splitsAssignment)).get();
+ } else {
+ context.assignSplits(splitsAssignment);
+ }
+
+ // The tracker should have recorded the assignments.
+ verifyAssignment(Arrays.asList("0"), splitSplitAssignmentTracker.uncheckpointedAssignments().get(0));
+ verifyAssignment(Arrays.asList("1", "2"), splitSplitAssignmentTracker.uncheckpointedAssignments().get(1));
+ // The OperatorCoordinatorContext should have received the event sending call.
+ assertEquals("There should be two events sent to the subtasks.",
+ 2, operatorCoordinatorContext.getEventsToOperator().size());
+
+ // Assert the events to subtask0.
+ List<OperatorEvent> eventsToSubtask0 = operatorCoordinatorContext.getEventsToOperatorBySubtaskId(0);
+ assertEquals(1, eventsToSubtask0.size());
+ OperatorEvent event = eventsToSubtask0.get(0);
+ assertTrue(event instanceof AddSplitEvent);
+ verifyAssignment(Arrays.asList("0"), ((AddSplitEvent) event).splits());
+ }
+
+ @Test
+ public void testAssignSplitToUnregisteredReaderFromCoordinatorExecutor() {
+ testAssignSplitToUnregisterdReader(true);
+ }
+
+ @Test
+ public void testAssignSplitToUnregisteredReaderFromOtherThread() {
+ testAssignSplitToUnregisterdReader(false);
+ }
+
+ private void testAssignSplitToUnregisterdReader(boolean fromCoordinatorExecutor) {
+ // Assign splits to the readers.
+ SplitsAssignment<MockSourceSplit> splitsAssignment = getSplitsAssignment(2, 0);
+ verifyException(
+ () -> {
+ if (fromCoordinatorExecutor) {
+ coordinatorExecutor.submit(() -> context.assignSplits(splitsAssignment)).get();
+ } else {
+ context.assignSplits(splitsAssignment);
+ }
+ },
+ "assignSpoits() should fail to assign the splits to a reader that is not registered.",
+ "Cannot assign splits");
+ }
+
+ @Test
+ public void testSnapshotAndRestore() throws Exception {
+ registerReaders();
+
+ // Assign splits to the readers.
+ SplitsAssignment<MockSourceSplit> splitsAssignment = getSplitsAssignment(2, 0);
+ coordinatorExecutor.submit(() -> context.assignSplits(splitsAssignment)).get();
+ // Take the first snapshot;
+ byte[] bytes = takeSnapshot(context, 100L);
+
+ SourceCoordinatorContext<MockSourceSplit> restoredContext;
+ SplitAssignmentTracker<MockSourceSplit> restoredTracker = new SplitAssignmentTracker<>();
+ SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory =
+ new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(TEST_OPERATOR_ID.toHexString());
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ DataInputStream in = new DataInputStream(bais)) {
+ restoredContext = new SourceCoordinatorContext<>(
+ coordinatorExecutor,
+ coordinatorThreadFactory,
+ 1,
+ operatorCoordinatorContext,
+ restoredTracker);
+ restoredContext.restoreState(new MockSourceSplitSerializer(), in);
+ }
+ assertEquals(context.registeredReaders(), restoredContext.registeredReaders());
+ assertEquals(splitSplitAssignmentTracker.uncheckpointedAssignments(), restoredTracker.uncheckpointedAssignments());
+ assertEquals(splitSplitAssignmentTracker.assignmentsByCheckpointId(), restoredTracker.assignmentsByCheckpointId());
+
+ }
+
+ // ------------------------
+
+ private List<ReaderInfo> registerReaders() {
+ // Register the readers.
+ ReaderInfo readerInfo0 = new ReaderInfo(0, "subtask_0_location");
+ ReaderInfo readerInfo1 = new ReaderInfo(1, "subtask_1_location");
+ ReaderInfo readerInfo2 = new ReaderInfo(2, "subtask_1_location");
+ context.registerSourceReader(readerInfo0);
+ context.registerSourceReader(readerInfo1);
+ context.registerSourceReader(readerInfo2);
+ return Arrays.asList(readerInfo0, readerInfo1, readerInfo2);
+ }
+
+ private byte[] takeSnapshot(SourceCoordinatorContext<MockSourceSplit> context, long checkpointId) throws Exception {
+ byte[] bytes;
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputViewStreamWrapper(baos)) {
+ context.snapshotState(checkpointId, new MockSourceSplitSerializer(), out);
+ out.flush();
+ bytes = baos.toByteArray();
+ }
+ return bytes;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
new file mode 100644
index 0000000..2f12235
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -0,0 +1,214 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment;
+import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for {@link SourceCoordinator}.
+ */
+public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
+
+ @Test
+ public void testThrowExceptionWhenNotStarted() {
+ // The following methods should only be invoked after the source coordinator has started.
+ String failureMessage = "Call should fail when source coordinator has not started yet.";
+ verifyException(() -> sourceCoordinator.checkpointComplete(100L),
+ failureMessage, "The coordinator has not started yet.");
+ verifyException(() -> sourceCoordinator.handleEventFromOperator(0, null),
+ failureMessage, "The coordinator has not started yet.");
+ verifyException(() -> sourceCoordinator.subtaskFailed(0),
+ failureMessage, "The coordinator has not started yet.");
+ verifyException(() -> sourceCoordinator.checkpointCoordinator(100L),
+ failureMessage, "The coordinator has not started yet.");
+ }
+
+ @Test
+ public void testRestCheckpointAfterCoordinatorStarted() throws Exception {
+ // The following methods should only be invoked after the source coordinator has started.
+ sourceCoordinator.start();
+ verifyException(() -> sourceCoordinator.resetToCheckpoint(null),
+ "Reset to checkpoint should fail after the coordinator has started",
+ String.format("The coordinator for source %s has started. The source coordinator state can " +
+ "only be reset to a checkpoint before it starts.", OPERATOR_NAME));
+ }
+
+ @Test
+ public void testStart() throws Exception {
+ assertFalse(enumerator.started());
+ sourceCoordinator.start();
+ assertTrue(enumerator.started());
+ }
+
+ @Test
+ public void testClosed() throws Exception {
+ assertFalse(enumerator.closed());
+ sourceCoordinator.start();
+ sourceCoordinator.close();
+ assertTrue(enumerator.closed());
+ }
+
+ @Test
+ public void testReaderRegistration() throws Exception {
+ sourceCoordinator.start();
+ sourceCoordinator.handleEventFromOperator(
+ 0, new ReaderRegistrationEvent(0, "location_0"));
+ check(() -> {
+ assertEquals("2 splits should have been assigned to reader 0",
+ 4, enumerator.getUnassignedSplits().size());
+ assertTrue(context.registeredReaders().containsKey(0));
+ assertTrue(enumerator.getHandledSourceEvent().isEmpty());
+ verifyAssignment(Arrays.asList("0", "3"), splitSplitAssignmentTracker.uncheckpointedAssignments().get(0));
+ });
+ }
+
+ @Test
+ public void testHandleSourceEvent() throws Exception {
+ sourceCoordinator.start();
+ SourceEvent sourceEvent = new SourceEvent() {};
+ sourceCoordinator.handleEventFromOperator(0, new SourceEventWrapper(sourceEvent));
+ check(() -> {
+ assertEquals(1, enumerator.getHandledSourceEvent().size());
+ assertEquals(sourceEvent, enumerator.getHandledSourceEvent().get(0));
+ });
+ }
+
+ @Test
+ public void testCheckpointCoordinatorAndRestore() throws Exception {
+ sourceCoordinator.start();
+ sourceCoordinator.handleEventFromOperator(
+ 0, new ReaderRegistrationEvent(0, "location_0"));
+ byte[] bytes = sourceCoordinator.checkpointCoordinator(100L).get();
+
+ // restore from the checkpoints.
+ SourceCoordinator<?, ?> restoredCoordinator = getNewSourceCoordinator();
+ restoredCoordinator.resetToCheckpoint(bytes);
+ MockSplitEnumerator restoredEnumerator = (MockSplitEnumerator) restoredCoordinator.getEnumerator();
+ SourceCoordinatorContext restoredContext = restoredCoordinator.getContext();
+ assertEquals("2 splits should have been assigned to reader 0",
+ 4, restoredEnumerator.getUnassignedSplits().size());
+ assertTrue(restoredEnumerator.getHandledSourceEvent().isEmpty());
+ assertEquals(1, restoredContext.registeredReaders().size());
+ assertTrue(restoredContext.registeredReaders().containsKey(0));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSubtaskFailedAndRevertUncompletedAssignments() throws Exception {
+ sourceCoordinator.start();
+
+ // Assign some splits to reader 0 then take snapshot 100.
+ sourceCoordinator.handleEventFromOperator(
+ 0, new ReaderRegistrationEvent(0, "location_0"));
+ sourceCoordinator.checkpointCoordinator(100L).get();
+
+ // Add split 6, assign it to reader 0 and take another snapshot 101.
+ enumerator.addNewSplits(Collections.singletonList(new MockSourceSplit(6)));
+ sourceCoordinator.checkpointCoordinator(101L).get();
+
+ // check the state.
+ check(() -> {
+ // There should be 4 unassigned splits.
+ assertEquals(4, enumerator.getUnassignedSplits().size());
+ verifyAssignment(
+ Arrays.asList("0", "3"),
+ splitSplitAssignmentTracker.assignmentsByCheckpointId().get(100L).get(0));
+ assertTrue(splitSplitAssignmentTracker.uncheckpointedAssignments().isEmpty());
+ verifyAssignment(Arrays.asList("0", "3"), splitSplitAssignmentTracker.assignmentsByCheckpointId(100L).get(0));
+ verifyAssignment(Arrays.asList("6"), splitSplitAssignmentTracker.assignmentsByCheckpointId(101L).get(0));
+
+ List<OperatorEvent> eventsToReader0 = operatorCoordinatorContext.getEventsToOperator().get(0);
+ assertEquals(2, eventsToReader0.size());
+ verifyAssignment(Arrays.asList("0", "3"), ((AddSplitEvent<MockSourceSplit>) eventsToReader0.get(0)).splits());
+ verifyAssignment(Arrays.asList("6"), ((AddSplitEvent<MockSourceSplit>) eventsToReader0.get(1)).splits());
+ });
+
+ // Fail reader 0.
+ sourceCoordinator.subtaskFailed(0);
+
+ // check the state again.
+ check(() -> {
+ //
+ assertFalse("Reader 0 should have been unregistered.",
+ context.registeredReaders().containsKey(0));
+ // The tracker should have reverted all the splits assignment to reader 0.
+ for (Map<Integer, ?> assignment : splitSplitAssignmentTracker.assignmentsByCheckpointId().values()) {
+ assertFalse("Assignment in uncompleted checkpoint should have been reverted.",
+ assignment.containsKey(0));
+ }
+ assertFalse(splitSplitAssignmentTracker.uncheckpointedAssignments().containsKey(0));
+ // The split enumerator should now contains the splits used to be assigned to reader 0.
+ assertEquals(7, enumerator.getUnassignedSplits().size());
+ });
+ }
+
+ @Test
+ public void testFailedSubtaskDoNotRevertCompletedCheckpoint() throws Exception {
+ sourceCoordinator.start();
+
+ // Assign some splits to reader 0 then take snapshot 100.
+ sourceCoordinator.handleEventFromOperator(
+ 0, new ReaderRegistrationEvent(0, "location_0"));
+ sourceCoordinator.checkpointCoordinator(100L).get();
+ // Complete checkpoint 100.
+ sourceCoordinator.checkpointComplete(100L);
+
+ // Fail reader 0.
+ sourceCoordinator.subtaskFailed(0);
+
+ check(() -> {
+ // Reader 0 hase been unregistered.
+ assertFalse(context.registeredReaders().containsKey(0));
+ // The assigned splits are not reverted.
+ assertEquals(4, enumerator.getUnassignedSplits().size());
+ assertFalse(splitSplitAssignmentTracker.uncheckpointedAssignments().containsKey(0));
+ assertTrue(splitSplitAssignmentTracker.assignmentsByCheckpointId().isEmpty());
+ });
+ }
+
+ // -------------------------------
+
+ private void check(Runnable runnable) {
+ try {
+ coordinatorExecutor.submit(runnable).get();
+ } catch (Exception e) {
+ fail("Test failed due to " + e);
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
new file mode 100644
index 0000000..80a73bb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -0,0 +1,86 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * The test base for SourceCoordinator related tests.
+ */
+public abstract class SourceCoordinatorTestBase {
+ protected static final String OPERATOR_NAME = "TestOperator";
+ protected static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234L, 5678L);
+ protected static final int NUM_SUBTASKS = 3;
+
+ protected ExecutorService coordinatorExecutor;
+ protected MockOperatorCoordinatorContext operatorCoordinatorContext;
+ protected SplitAssignmentTracker<MockSourceSplit> splitSplitAssignmentTracker;
+ protected SourceCoordinatorContext<MockSourceSplit> context;
+ protected SourceCoordinator<?, ?> sourceCoordinator;
+ protected MockSplitEnumerator enumerator;
+
+ @Before
+ public void setup() {
+ operatorCoordinatorContext = new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, NUM_SUBTASKS);
+ splitSplitAssignmentTracker = new SplitAssignmentTracker<>();
+ String coordinatorThreadName = TEST_OPERATOR_ID.toHexString();
+ SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory =
+ new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(coordinatorThreadName);
+ coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+ context = new SourceCoordinatorContext<>(
+ coordinatorExecutor,
+ coordinatorThreadFactory,
+ 1,
+ operatorCoordinatorContext,
+ splitSplitAssignmentTracker);
+ sourceCoordinator = getNewSourceCoordinator();
+ enumerator = (MockSplitEnumerator) sourceCoordinator.getEnumerator();
+ }
+
+ @After
+ public void cleanUp() throws InterruptedException, TimeoutException {
+ coordinatorExecutor.shutdown();
+ if (!coordinatorExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+ throw new TimeoutException("Failed to close the CoordinatorExecutor before timeout.");
+ }
+ }
+
+ // --------------------------
+
+ protected SourceCoordinator getNewSourceCoordinator() {
+ Source<Integer, MockSourceSplit, Set<MockSourceSplit>> mockSource =
+ new MockSource(Boundedness.BOUNDED, NUM_SUBTASKS * 2);
+ return new SourceCoordinator<>(OPERATOR_NAME, coordinatorExecutor, mockSource, context);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java
new file mode 100644
index 0000000..86ca76e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java
@@ -0,0 +1,173 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.getSplitsAssignment;
+import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for @link {@link SplitAssignmentTracker}.
+ */
+public class SplitAssignmentTrackerTest {
+
+ @Test
+ public void testRecordIncrementalSplitAssignment() {
+ SplitAssignmentTracker<MockSourceSplit> tracker = new SplitAssignmentTracker<>();
+ tracker.recordSplitAssignment(getSplitsAssignment(3, 0));
+ tracker.recordSplitAssignment(getSplitsAssignment(2, 6));
+
+ verifyAssignment(Arrays.asList("0", "6"), tracker.uncheckpointedAssignments().get(0));
+ verifyAssignment(Arrays.asList("1", "2", "7", "8"), tracker.uncheckpointedAssignments().get(1));
+ verifyAssignment(Arrays.asList("3", "4", "5"), tracker.uncheckpointedAssignments().get(2));
+ }
+
+ @Test
+ public void testTakeSnapshot() throws Exception {
+ final long checkpointId = 123L;
+ SplitAssignmentTracker<MockSourceSplit> tracker = new SplitAssignmentTracker<>();
+ tracker.recordSplitAssignment(getSplitsAssignment(3, 0));
+
+ // Serialize
+ takeSnapshot(tracker, checkpointId);
+
+ // Verify the uncheckpointed assignments.
+ assertTrue(tracker.uncheckpointedAssignments().isEmpty());
+
+ // verify assignments put into the checkpoints.
+ Map<Long, Map<Integer, LinkedHashSet<MockSourceSplit>>> assignmentsByCheckpoints =
+ tracker.assignmentsByCheckpointId();
+ assertEquals(1, assignmentsByCheckpoints.size());
+
+ Map<Integer, LinkedHashSet<MockSourceSplit>> assignmentForCheckpoint = assignmentsByCheckpoints.get(checkpointId);
+ assertNotNull(assignmentForCheckpoint);
+
+ verifyAssignment(Arrays.asList("0"), assignmentForCheckpoint.get(0));
+ verifyAssignment(Arrays.asList("1", "2"), assignmentForCheckpoint.get(1));
+ verifyAssignment(Arrays.asList("3", "4", "5"), assignmentForCheckpoint.get(2));
+ }
+
+ @Test
+ public void testRestore() throws Exception {
+ final long checkpointId = 123L;
+ SplitAssignmentTracker<MockSourceSplit> tracker = new SplitAssignmentTracker<>();
+ tracker.recordSplitAssignment(getSplitsAssignment(1, 0));
+
+ // Serialize
+ byte[] bytes = takeSnapshot(tracker, checkpointId);
+
+ // Deserialize
+ SplitAssignmentTracker<MockSourceSplit> deserializedTracker = restoreSnapshot(bytes);
+ // Verify the restore was successful.
+ assertEquals(deserializedTracker.assignmentsByCheckpointId(), tracker.assignmentsByCheckpointId());
+ assertEquals(deserializedTracker.uncheckpointedAssignments(), tracker.uncheckpointedAssignments());
+ }
+
+ @Test
+ public void testOnCheckpointComplete() throws Exception {
+ final long checkpointId1 = 100L;
+ final long checkpointId2 = 101L;
+ SplitAssignmentTracker<MockSourceSplit> tracker = new SplitAssignmentTracker<>();
+
+ // Assign some splits to subtask 0 and 1.
+ tracker.recordSplitAssignment(getSplitsAssignment(2, 0));
+
+ // Take the first snapshot.
+ takeSnapshot(tracker, checkpointId1);
+ verifyAssignment(Arrays.asList("0"), tracker.assignmentsByCheckpointId(checkpointId1).get(0));
+ verifyAssignment(Arrays.asList("1", "2"), tracker.assignmentsByCheckpointId(checkpointId1).get(1));
+
+ // Assign additional splits to subtask 0 and 1.
+ tracker.recordSplitAssignment(getSplitsAssignment(2, 3));
+
+ // Take the second snapshot.
+ takeSnapshot(tracker, checkpointId2);
+ verifyAssignment(Arrays.asList("0"), tracker.assignmentsByCheckpointId(checkpointId1).get(0));
+ verifyAssignment(Arrays.asList("1", "2"), tracker.assignmentsByCheckpointId(checkpointId1).get(1));
+ verifyAssignment(Arrays.asList("3"), tracker.assignmentsByCheckpointId(checkpointId2).get(0));
+ verifyAssignment(Arrays.asList("4", "5"), tracker.assignmentsByCheckpointId(checkpointId2).get(1));
+
+ // Complete the first checkpoint.
+ tracker.onCheckpointComplete(checkpointId1);
+ assertNull(tracker.assignmentsByCheckpointId(checkpointId1));
+ verifyAssignment(Arrays.asList("3"), tracker.assignmentsByCheckpointId(checkpointId2).get(0));
+ verifyAssignment(Arrays.asList("4", "5"), tracker.assignmentsByCheckpointId(checkpointId2).get(1));
+ }
+
+ @Test
+ public void testGetAndRemoveUncheckpointedAssignment() throws Exception {
+ final long checkpointId1 = 100L;
+ final long checkpointId2 = 101L;
+ SplitAssignmentTracker<MockSourceSplit> tracker = new SplitAssignmentTracker<>();
+
+ // Assign some splits and take snapshot 1.
+ tracker.recordSplitAssignment(getSplitsAssignment(2, 0));
+ takeSnapshot(tracker, checkpointId1);
+
+ // Assign some more splits and take snapshot 2.
+ tracker.recordSplitAssignment(getSplitsAssignment(2, 3));
+ takeSnapshot(tracker, checkpointId2);
+
+ // Now assume subtask 0 has failed.
+ List<MockSourceSplit> splitsToPutBack = tracker.getAndRemoveUncheckpointedAssignment(0);
+ verifyAssignment(Arrays.asList("0", "3"), splitsToPutBack);
+ }
+
+ // ---------------------
+
+ private byte[] takeSnapshot(SplitAssignmentTracker<MockSourceSplit> tracker, long checkpointId) throws Exception {
+ byte[] bytes;
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputViewStreamWrapper(baos)) {
+ tracker.snapshotState(checkpointId, new MockSourceSplitSerializer(), out);
+ out.flush();
+ bytes = baos.toByteArray();
+ }
+ return bytes;
+ }
+
+ private SplitAssignmentTracker<MockSourceSplit> restoreSnapshot(byte[] bytes) throws Exception {
+ SplitAssignmentTracker<MockSourceSplit> deserializedTracker;
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ DataInputStream in = new DataInputViewStreamWrapper(bais)) {
+ deserializedTracker = new SplitAssignmentTracker<>();
+ deserializedTracker.restoreState(new MockSourceSplitSerializer(), in);
+ }
+ return deserializedTracker;
+ }
+}