You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/06/15 16:27:58 UTC
[flink] 01/02: [FLINK-18039][connector/common] Introduce a
RecreateOnResetOperatorCoordinator class to recreate an OperatorCoordinator
instance when resetToCheckpoint() is invoked. Let SourceCoordinator
leverage RecreateOnResetOperatorCoordinator to ensure a clean checkpoint
reset.
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 24201cc1b6c46c689abbff8635a01cec7b088983
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Sun Jun 7 08:19:41 2020 +0800
[FLINK-18039][connector/common] Introduce a RecreateOnResetOperatorCoordinator class to recreate an OperatorCoordinator instance when resetToCheckpoint() is invoked. Let SourceCoordinator leverage RecreateOnResetOperatorCoordinator to ensure a clean checkpoint reset.
---
.../RecreateOnResetOperatorCoordinator.java | 195 +++++++++++++++++++++
.../source/coordinator/ExecutorNotifier.java | 25 ++-
.../source/coordinator/SourceCoordinator.java | 5 +-
.../coordinator/SourceCoordinatorContext.java | 11 +-
.../coordinator/SourceCoordinatorProvider.java | 18 +-
.../RecreateOnResetOperatorCoordinatorTest.java | 119 +++++++++++++
.../source/coordinator/ExecutorNotifierTest.java | 108 ++++++++++++
.../coordinator/SourceCoordinatorProviderTest.java | 99 +++++++++++
8 files changed, 561 insertions(+), 19 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
new file mode 100644
index 0000000..ea91156
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -0,0 +1,195 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A class that will recreate a new {@link OperatorCoordinator} instance when
+ * reset to checkpoint.
+ */
+public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator {
+
+ private final Provider provider;
+ private QuiesceableContext quiesceableContext;
+ private OperatorCoordinator coordinator;
+
+ private boolean started;
+
+ private RecreateOnResetOperatorCoordinator(
+ QuiesceableContext context,
+ Provider provider) {
+ this.quiesceableContext = context;
+ this.provider = provider;
+ this.coordinator = provider.getCoordinator(context);
+ this.started = false;
+ }
+
+ @Override
+ public void start() throws Exception {
+ coordinator.start();
+ started = true;
+ }
+
+ @Override
+ public void close() throws Exception {
+ coordinator.close();
+ }
+
+ @Override
+ public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
+ coordinator.handleEventFromOperator(subtask, event);
+ }
+
+ @Override
+ public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+ coordinator.subtaskFailed(subtask, reason);
+ }
+
+ @Override
+ public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception {
+ coordinator.checkpointCoordinator(checkpointId, resultFuture);
+ }
+
+ @Override
+ public void checkpointComplete(long checkpointId) {
+ coordinator.checkpointComplete(checkpointId);
+ }
+
+ @Override
+ public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+ // Quiesce the context so the coordinator cannot interact with the job master anymore.
+ quiesceableContext.quiesce();
+ // Close the coordinator.
+ coordinator.close();
+ // Create a new coordinator and reset to the checkpoint.
+ quiesceableContext = new QuiesceableContext(quiesceableContext.getContext());
+ coordinator = provider.getCoordinator(quiesceableContext);
+ coordinator.resetToCheckpoint(checkpointData);
+ // Start the new coordinator if this coordinator has been started before reset to the checkpoint.
+ if (started) {
+ coordinator.start();
+ }
+ }
+
+ // ---------------------
+
+ @VisibleForTesting
+ public OperatorCoordinator getInternalCoordinator() {
+ return coordinator;
+ }
+
+ @VisibleForTesting
+ QuiesceableContext getQuiesceableContext() {
+ return quiesceableContext;
+ }
+
+ // ---------------------
+
+ public static abstract class Provider implements OperatorCoordinator.Provider {
+ private static final long serialVersionUID = 3002837631612629071L;
+ private final OperatorID operatorID;
+
+ public Provider(OperatorID operatorID) {
+ this.operatorID = operatorID;
+ }
+
+ @Override
+ public OperatorID getOperatorId() {
+ return operatorID;
+ }
+
+ @Override
+ public OperatorCoordinator create(Context context) {
+ QuiesceableContext quiesceableContext = new QuiesceableContext(context);
+ return new RecreateOnResetOperatorCoordinator(quiesceableContext, this);
+ }
+
+ protected abstract OperatorCoordinator getCoordinator(OperatorCoordinator.Context context);
+ }
+
+ // ----------------------
+
+ /**
+ * A wrapper class around the operator coordinator context to allow quiescence.
+ * When a new operator coordinator is created, we need to quiesce the old
+ * operator coordinator to prevent it from making any further impact to the
+ * job master. This is done by quiesce the operator coordinator context.
+ * After the quiescence, the "reading" methods will still work, but the
+ * "writing" methods will become a no-op or fail immediately.
+ */
+ @VisibleForTesting
+ static class QuiesceableContext implements OperatorCoordinator.Context {
+ private final OperatorCoordinator.Context context;
+ private volatile boolean quiesced;
+
+ QuiesceableContext(OperatorCoordinator.Context context) {
+ this.context = context;
+ quiesced = false;
+ }
+
+ @Override
+ public OperatorID getOperatorId() {
+ return context.getOperatorId();
+ }
+
+ @Override
+ public synchronized CompletableFuture<Acknowledge> sendEvent(
+ OperatorEvent evt,
+ int targetSubtask) throws TaskNotRunningException {
+ // Do not enter the sending procedure if the context has been quiesced.
+ if (quiesced) {
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
+ return context.sendEvent(evt, targetSubtask);
+ }
+
+ @Override
+ public synchronized void failJob(Throwable cause) {
+ if (quiesced) {
+ return;
+ }
+ context.failJob(cause);
+ }
+
+ @Override
+ public int currentParallelism() {
+ return context.currentParallelism();
+ }
+
+ @VisibleForTesting
+ synchronized void quiesce() {
+ quiesced = true;
+ }
+
+ @VisibleForTesting
+ boolean isQuiesced() {
+ return quiesced;
+ }
+
+ private OperatorCoordinator.Context getContext() {
+ return context;
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
index e730a15..c865df5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
@@ -25,21 +25,24 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
/**
* This class is used to coordinate between two components, where one component has an
* executor following the mailbox model and the other component notifies it when needed.
*/
-public class ExecutorNotifier {
+public class ExecutorNotifier implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ExecutorNotifier.class);
- private ScheduledExecutorService workerExecutor;
- private Executor executorToNotify;
+ private final ScheduledExecutorService workerExecutor;
+ private final Executor executorToNotify;
+ private final AtomicBoolean closed;
public ExecutorNotifier(ScheduledExecutorService workerExecutor,
Executor executorToNotify) {
this.executorToNotify = executorToNotify;
this.workerExecutor = workerExecutor;
+ this.closed = new AtomicBoolean(false);
}
/**
@@ -134,4 +137,20 @@ public class ExecutorNotifier {
}
}, initialDelayMs, periodMs, TimeUnit.MILLISECONDS);
}
+
+ /**
+ * Close the executor notifier. This is a blocking call which waits for all the
+ * async calls to finish before it returns.
+ *
+ * @throws InterruptedException when interrupted during closure.
+ */
+ public void close() throws InterruptedException {
+ if (!closed.compareAndSet(false, true)) {
+ LOG.debug("The executor notifier has been closed.");
+ return;
+ }
+ // Shutdown the worker executor, so no more worker tasks can run.
+ workerExecutor.shutdownNow();
+ workerExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index dc5f1dd..3aa095d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -113,6 +113,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements
boolean successfullyClosed = false;
try {
if (started) {
+ context.close();
enumerator.close();
}
} finally {
@@ -202,10 +203,6 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements
"only be reset to a checkpoint before it starts.", operatorName));
}
LOG.info("Resetting coordinator of source {} from checkpoint.", operatorName);
- if (started) {
- enumerator.close();
- }
- LOG.info("Resetting SourceCoordinator from checkpoint.");
fromBytes(checkpointData);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 47fe564..322778a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readRegisteredReaders;
@@ -77,7 +78,8 @@ import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerde
* @param <SplitT> the type of the splits.
*/
@Internal
-public class SourceCoordinatorContext<SplitT extends SourceSplit> implements SplitEnumeratorContext<SplitT> {
+public class SourceCoordinatorContext<SplitT extends SourceSplit>
+ implements SplitEnumeratorContext<SplitT>, AutoCloseable {
private final ExecutorService coordinatorExecutor;
private final ExecutorNotifier notifier;
private final OperatorCoordinator.Context operatorCoordinatorContext;
@@ -192,6 +194,13 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> implements Spl
notifier.notifyReadyAsync(callable, handler);
}
+ @Override
+ public void close() throws InterruptedException {
+ notifier.close();
+ coordinatorExecutor.shutdown();
+ coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ }
+
// --------- Package private additional methods for the SourceCoordinator ------------
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 77ad7a3..8ef3f48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import java.util.concurrent.Callable;
@@ -33,10 +34,9 @@ import java.util.function.BiConsumer;
/**
* The provider of {@link SourceCoordinator}.
*/
-public class SourceCoordinatorProvider<SplitT extends SourceSplit>
- implements OperatorCoordinator.Provider {
+public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends RecreateOnResetOperatorCoordinator.Provider {
+ private static final long serialVersionUID = -1921681440009738462L;
private final String operatorName;
- private final OperatorID operatorID;
private final Source<?, SplitT, ?> source;
private final int numWorkerThreads;
@@ -56,25 +56,21 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit>
OperatorID operatorID,
Source<?, SplitT, ?> source,
int numWorkerThreads) {
+ super(operatorID);
this.operatorName = operatorName;
- this.operatorID = operatorID;
this.source = source;
this.numWorkerThreads = numWorkerThreads;
}
@Override
- public OperatorID getOperatorId() {
- return operatorID;
- }
-
- @Override
- public OperatorCoordinator create(OperatorCoordinator.Context context) {
+ public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
final String coordinatorThreadName = "SourceCoordinator-" + operatorName;
CoordinatorExecutorThreadFactory coordinatorThreadFactory =
new CoordinatorExecutorThreadFactory(coordinatorThreadName);
ExecutorService coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
- new SourceCoordinatorContext<>(coordinatorExecutor, coordinatorThreadFactory, numWorkerThreads, context);
+ new SourceCoordinatorContext<>(coordinatorExecutor, coordinatorThreadFactory, numWorkerThreads,
+ context);
return new SourceCoordinator<>(operatorName, coordinatorExecutor, source, sourceCoordinatorContext);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
new file mode 100644
index 0000000..f4298ce
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
@@ -0,0 +1,119 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link RecreateOnResetOperatorCoordinator}.
+ */
+public class RecreateOnResetOperatorCoordinatorTest {
+ private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
+ private static final int NUM_SUBTASKS = 1;
+
+ @Test
+ public void testQuiesceableContextNotQuiesced() throws TaskNotRunningException {
+ MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
+ RecreateOnResetOperatorCoordinator.QuiesceableContext quiesceableContext =
+ new RecreateOnResetOperatorCoordinator.QuiesceableContext(context);
+
+ TestingEvent event = new TestingEvent();
+ quiesceableContext.sendEvent(event, 0);
+ quiesceableContext.failJob(new Exception());
+
+ assertEquals(OPERATOR_ID, quiesceableContext.getOperatorId());
+ assertEquals(NUM_SUBTASKS, quiesceableContext.currentParallelism());
+ assertEquals(Collections.singletonList(event), context.getEventsToOperatorBySubtaskId(0));
+ assertTrue(context.isJobFailed());
+ }
+
+ @Test
+ public void testQuiescedContext() throws TaskNotRunningException {
+ MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
+ RecreateOnResetOperatorCoordinator.QuiesceableContext quiesceableContext =
+ new RecreateOnResetOperatorCoordinator.QuiesceableContext(context);
+
+ quiesceableContext.quiesce();
+ quiesceableContext.sendEvent(new TestingEvent(), 0);
+ quiesceableContext.failJob(new Exception());
+
+ assertEquals(OPERATOR_ID, quiesceableContext.getOperatorId());
+ assertEquals(NUM_SUBTASKS, quiesceableContext.currentParallelism());
+ assertTrue(context.getEventsToOperator().isEmpty());
+ assertFalse(context.isJobFailed());
+ }
+
+ @Test
+ public void testResetToCheckpoint() throws Exception {
+ TestingCoordinatorProvider provider = new TestingCoordinatorProvider();
+ MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
+ RecreateOnResetOperatorCoordinator coordinator = createCoordinator(provider, context);
+
+ RecreateOnResetOperatorCoordinator.QuiesceableContext contextBeforeReset = coordinator.getQuiesceableContext();
+ TestingOperatorCoordinator internalCoordinatorBeforeReset = getInternalCoordinator(coordinator);
+
+ byte[] stateToRestore = new byte[0];
+ coordinator.resetToCheckpoint(stateToRestore);
+
+ assertTrue(contextBeforeReset.isQuiesced());
+ assertNull(internalCoordinatorBeforeReset.getLastRestoredCheckpointState());
+
+ TestingOperatorCoordinator internalCoordinatorAfterReset = getInternalCoordinator(coordinator);
+ assertEquals(stateToRestore, internalCoordinatorAfterReset.getLastRestoredCheckpointState());
+ }
+
+ // ---------------
+
+ private RecreateOnResetOperatorCoordinator createCoordinator(
+ TestingCoordinatorProvider provider,
+ OperatorCoordinator.Context context) {
+ return (RecreateOnResetOperatorCoordinator) provider.create(context);
+ }
+
+ private TestingOperatorCoordinator getInternalCoordinator(RecreateOnResetOperatorCoordinator coordinator) {
+ return (TestingOperatorCoordinator) coordinator.getInternalCoordinator();
+ }
+
+ // ---------------
+
+ private static class TestingCoordinatorProvider extends RecreateOnResetOperatorCoordinator.Provider {
+ private static final long serialVersionUID = 4184184580789587013L;
+
+ public TestingCoordinatorProvider() {
+ super(OPERATOR_ID);
+ }
+
+ @Override
+ protected OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
+ return new TestingOperatorCoordinator(context);
+ }
+ }
+
+ private static class TestingEvent implements OperatorEvent {
+ private static final long serialVersionUID = -3289352911927668275L;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
new file mode 100644
index 0000000..2412a251
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
@@ -0,0 +1,108 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Unit tests for ExecutorNotifier.
+ */
+public class ExecutorNotifierTest {
+ private ScheduledExecutorService workerExecutor;
+ private ExecutorService executorToNotify;
+ private ExecutorNotifier notifier;
+ private Throwable exceptionInHandler;
+
+ @Before
+ public void setup() {
+ this.exceptionInHandler = null;
+ this.workerExecutor = Executors.newSingleThreadScheduledExecutor(r ->
+ new Thread(r, "worker-thread"));
+ this.executorToNotify = Executors.newSingleThreadExecutor(r -> {
+ Thread t = new Thread(r, "main-thread");
+ t.setUncaughtExceptionHandler((thread, e) -> exceptionInHandler = e);
+ return t;
+ });
+ this.notifier = new ExecutorNotifier(
+ this.workerExecutor,
+ this.executorToNotify);
+ }
+
+ @After
+ public void tearDown() throws InterruptedException {
+ notifier.close();
+ closeExecutorToNotify();
+ }
+
+ @Test
+ public void testBasic() throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicInteger result = new AtomicInteger(0);
+ notifier.notifyReadyAsync(() -> 1234, (v, e) -> {
+ result.set(v);
+ latch.countDown();
+ });
+ latch.await();
+ closeExecutorToNotify();
+ assertEquals(1234, result.get());
+ }
+
+ @Test
+ public void testExceptionInCallable() {
+ Exception exception = new Exception("Expected exception.");
+ notifier.notifyReadyAsync(
+ () -> {
+ throw exception;
+ },
+ (v, e) -> {
+ assertEquals(exception, e);
+ assertNull(v);
+ });
+ }
+
+ @Test
+ public void testExceptionInHandler() throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+ RuntimeException exception = new RuntimeException("Expected exception.");
+ notifier.notifyReadyAsync(() -> 1234, (v, e) -> {
+ latch.countDown();
+ throw exception;
+ });
+ latch.await();
+ closeExecutorToNotify();
+ assertEquals(exception, exceptionInHandler);
+ }
+
+ private void closeExecutorToNotify() throws InterruptedException {
+ executorToNotify.shutdown();
+ executorToNotify.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
new file mode 100644
index 0000000..c86981c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
@@ -0,0 +1,99 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link SourceCoordinatorProvider}.
+ */
+public class SourceCoordinatorProviderTest {
+ private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
+ private static final int NUM_SPLITS = 10;
+ private SourceCoordinatorProvider<MockSourceSplit> provider;
+
+ @Before
+ public void setup() {
+ provider = new SourceCoordinatorProvider<>(
+ "SourceCoordinatorProviderTest",
+ OPERATOR_ID,
+ new MockSource(Boundedness.BOUNDED, NUM_SPLITS),
+ 1);
+ }
+
+ @Test
+ public void testCreate() {
+ OperatorCoordinator coordinator =
+ provider.create(new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SPLITS));
+ assertTrue(coordinator instanceof RecreateOnResetOperatorCoordinator);
+ }
+
+ @Test
+ public void testCheckpointAndReset() throws Exception {
+ final OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SPLITS);
+ final RecreateOnResetOperatorCoordinator coordinator =
+ (RecreateOnResetOperatorCoordinator) provider.create(context);
+ final SourceCoordinator<?, ?> sourceCoordinator =
+ (SourceCoordinator<?, ?>) coordinator.getInternalCoordinator();
+
+ // Start the coordinator.
+ coordinator.start();
+ // register reader 0 and take a checkpoint.
+ coordinator.handleEventFromOperator(0, new ReaderRegistrationEvent(0, "location"));
+ CompletableFuture<byte[]> future = new CompletableFuture<>();
+ coordinator.checkpointCoordinator(0L, future);
+ byte[] bytes = future.get();
+
+ // Register reader 1.
+ coordinator.handleEventFromOperator(1, new ReaderRegistrationEvent(1, "location"));
+ // Wait until the coordinator context is updated with registration of reader 1.
+ while (sourceCoordinator.getContext().registeredReaders().size() < 2) {
+ Thread.sleep(1);
+ }
+
+ // reset the coordinator to the checkpoint which only contains reader 0.
+ coordinator.resetToCheckpoint(bytes);
+ final SourceCoordinator<?, ?> restoredSourceCoordinator =
+ (SourceCoordinator<?, ?>) coordinator.getInternalCoordinator();
+ assertNotEquals("The restored source coordinator should be a different instance",
+ restoredSourceCoordinator, sourceCoordinator);
+ assertEquals("There should only be one registered reader.",
+ 1, restoredSourceCoordinator.getContext().registeredReaders().size());
+ assertNotNull("The only registered reader should be reader 0",
+ restoredSourceCoordinator.getContext().registeredReaders().get(0));
+ }
+
+}