You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2022/02/21 07:28:42 UTC
[flink] 03/03: [FLINK-24607] Make OperatorCoordinator closure more robust.
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0f19c2472c54aac97e4067f5398731ab90036d1a
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Thu Feb 10 15:13:55 2022 +0800
[FLINK-24607] Make OperatorCoordinator closure more robust.
---
.../RecreateOnResetOperatorCoordinator.java | 12 ++++-
.../source/coordinator/ExecutorNotifier.java | 21 +-------
.../source/coordinator/SourceCoordinator.java | 36 ++++---------
.../coordinator/SourceCoordinatorContext.java | 24 +++++----
.../coordinator/SourceCoordinatorProvider.java | 11 +---
.../source/coordinator/ExecutorNotifierTest.java | 14 ++---
.../source/coordinator/SourceCoordinatorTest.java | 63 ++++++++++++++++++++--
.../coordinator/SourceCoordinatorTestBase.java | 1 -
8 files changed, 102 insertions(+), 80 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index 5c660d0..ffab3ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -128,8 +128,16 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator {
// capture the status whether the coordinator was started when this method was called
final boolean wasStarted = this.started;
- closingFuture.thenRun(
- () -> {
+ closingFuture.whenComplete(
+ (ignored, e) -> {
+ if (e != null) {
+ LOG.warn(
+ String.format(
+ "Received exception when closing "
+ + "operator coordinator for %s.",
+ oldCoordinator.operatorId),
+ e);
+ }
if (!closed) {
// The previous coordinator has closed. Create a new one.
newCoordinator.createNewInternalCoordinator(context, provider);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
index e52f6cd..fe4cf8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
@@ -25,23 +25,20 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
/**
* This class is used to coordinate between two components, where one component has an executor
* following the mailbox model and the other component notifies it when needed.
*/
-public class ExecutorNotifier implements AutoCloseable {
+public class ExecutorNotifier {
private static final Logger LOG = LoggerFactory.getLogger(ExecutorNotifier.class);
private final ScheduledExecutorService workerExecutor;
private final Executor executorToNotify;
- private final AtomicBoolean closed;
public ExecutorNotifier(ScheduledExecutorService workerExecutor, Executor executorToNotify) {
this.executorToNotify = executorToNotify;
this.workerExecutor = workerExecutor;
- this.closed = new AtomicBoolean(false);
}
/**
@@ -140,20 +137,4 @@ public class ExecutorNotifier implements AutoCloseable {
periodMs,
TimeUnit.MILLISECONDS);
}
-
- /**
- * Close the executor notifier. This is a blocking call which waits for all the async calls to
- * finish before it returns.
- *
- * @throws InterruptedException when interrupted during closure.
- */
- public void close() throws InterruptedException {
- if (!closed.compareAndSet(false, true)) {
- LOG.debug("The executor notifier has been closed.");
- return;
- }
- // Shutdown the worker executor, so no more worker tasks can run.
- workerExecutor.shutdownNow();
- workerExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index b2916dd..c1c61b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -61,12 +61,13 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import static java.util.Arrays.asList;
import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readAndVerifyCoordinatorSerdeVersion;
import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readBytes;
import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.writeCoordinatorSerdeVersion;
+import static org.apache.flink.util.IOUtils.closeAll;
import static org.apache.flink.util.Preconditions.checkState;
/**
@@ -93,8 +94,6 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
/** The name of the operator this SourceCoordinator is associated with. */
private final String operatorName;
- /** A single-thread executor to handle all the changes to the coordinator. */
- private final ScheduledExecutorService coordinatorExecutor;
/** The Source that is associated with this SourceCoordinator. */
private final Source<?, SplitT, EnumChkT> source;
/** The serializer that handles the serde of the SplitEnumerator checkpoints. */
@@ -113,13 +112,11 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
public SourceCoordinator(
String operatorName,
- ScheduledExecutorService coordinatorExecutor,
Source<?, SplitT, EnumChkT> source,
SourceCoordinatorContext<SplitT> context,
CoordinatorStore coordinatorStore) {
this(
operatorName,
- coordinatorExecutor,
source,
context,
coordinatorStore,
@@ -128,13 +125,11 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
public SourceCoordinator(
String operatorName,
- ScheduledExecutorService coordinatorExecutor,
Source<?, SplitT, EnumChkT> source,
SourceCoordinatorContext<SplitT> context,
CoordinatorStore coordinatorStore,
WatermarkAlignmentParams watermarkAlignmentParams) {
this.operatorName = operatorName;
- this.coordinatorExecutor = coordinatorExecutor;
this.source = source;
this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
this.context = context;
@@ -144,11 +139,12 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
if (watermarkAlignmentParams.isEnabled()) {
coordinatorStore.putIfAbsent(
watermarkAlignmentParams.getWatermarkGroup(), new WatermarkAggregator<>());
- coordinatorExecutor.scheduleAtFixedRate(
- this::announceCombinedWatermark,
- watermarkAlignmentParams.getUpdateInterval(),
- watermarkAlignmentParams.getUpdateInterval(),
- TimeUnit.MILLISECONDS);
+ context.getCoordinatorExecutor()
+ .scheduleAtFixedRate(
+ this::announceCombinedWatermark,
+ watermarkAlignmentParams.getUpdateInterval(),
+ watermarkAlignmentParams.getUpdateInterval(),
+ TimeUnit.MILLISECONDS);
}
}
@@ -216,18 +212,8 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
@Override
public void close() throws Exception {
LOG.info("Closing SourceCoordinator for source {}.", operatorName);
- try {
- if (started) {
- context.close();
- if (enumerator != null) {
- enumerator.close();
- }
- }
- } finally {
- coordinatorExecutor.shutdownNow();
- // We do not expect this to actually block for long. At this point, there should
- // be very few task running in the executor, if any.
- coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ if (started) {
+ closeAll(asList(context, enumerator), Throwable.class);
}
LOG.info("Source coordinator for source {} closed.", operatorName);
}
@@ -414,7 +400,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
return;
}
- coordinatorExecutor.execute(
+ context.runInCoordinatorThread(
() -> {
try {
action.run();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 83823f7..064a743 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -42,6 +42,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -50,12 +51,12 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
+import static org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.shutdownExecutorForcefully;
+
/**
* A context class for the {@link OperatorCoordinator}. Compared with {@link SplitEnumeratorContext}
* this class allows interaction with state and sending {@link OperatorEvent} to the SourceOperator
@@ -82,7 +83,8 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinatorContext.class);
- private final ExecutorService coordinatorExecutor;
+ private final ScheduledExecutorService workerExecutor;
+ private final ScheduledExecutorService coordinatorExecutor;
private final ExecutorNotifier notifier;
private final OperatorCoordinator.Context operatorCoordinatorContext;
private final SimpleVersionedSerializer<SplitT> splitSerializer;
@@ -95,13 +97,12 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
private volatile boolean closed;
public SourceCoordinatorContext(
- ExecutorService coordinatorExecutor,
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
int numWorkerThreads,
OperatorCoordinator.Context operatorCoordinatorContext,
SimpleVersionedSerializer<SplitT> splitSerializer) {
this(
- coordinatorExecutor,
+ Executors.newScheduledThreadPool(1, coordinatorThreadFactory),
Executors.newScheduledThreadPool(
numWorkerThreads,
new ExecutorThreadFactory(
@@ -115,12 +116,13 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
// Package private method for unit test.
@VisibleForTesting
SourceCoordinatorContext(
- ExecutorService coordinatorExecutor,
+ ScheduledExecutorService coordinatorExecutor,
ScheduledExecutorService workerExecutor,
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
OperatorCoordinator.Context operatorCoordinatorContext,
SimpleVersionedSerializer<SplitT> splitSerializer,
SplitAssignmentTracker<SplitT> splitAssignmentTracker) {
+ this.workerExecutor = workerExecutor;
this.coordinatorExecutor = coordinatorExecutor;
this.coordinatorThreadFactory = coordinatorThreadFactory;
this.operatorCoordinatorContext = operatorCoordinatorContext;
@@ -173,6 +175,10 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
String.format("Failed to send event %s to subtask %d", event, subtaskId));
}
+ ScheduledExecutorService getCoordinatorExecutor() {
+ return coordinatorExecutor;
+ }
+
@Override
public int currentParallelism() {
return operatorCoordinatorContext.currentParallelism();
@@ -259,9 +265,9 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
@Override
public void close() throws InterruptedException {
closed = true;
- notifier.close();
- coordinatorExecutor.shutdown();
- coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ // Close quietly so the closing sequence will be executed completely.
+ shutdownExecutorForcefully(workerExecutor, Duration.ofNanos(Long.MAX_VALUE));
+ shutdownExecutorForcefully(coordinatorExecutor, Duration.ofNanos(Long.MAX_VALUE));
}
// --------- Package private additional methods for the SourceCoordinator ------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index b74c007..55df066 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -31,8 +31,6 @@ import org.apache.flink.util.FatalExitExceptionHandler;
import javax.annotation.Nullable;
import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.function.BiConsumer;
@@ -75,20 +73,13 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit>
CoordinatorExecutorThreadFactory coordinatorThreadFactory =
new CoordinatorExecutorThreadFactory(
coordinatorThreadName, context.getUserCodeClassloader());
- ScheduledExecutorService coordinatorExecutor =
- Executors.newScheduledThreadPool(1, coordinatorThreadFactory);
SimpleVersionedSerializer<SplitT> splitSerializer = source.getSplitSerializer();
SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
new SourceCoordinatorContext<>(
- coordinatorExecutor,
- coordinatorThreadFactory,
- numWorkerThreads,
- context,
- splitSerializer);
+ coordinatorThreadFactory, numWorkerThreads, context, splitSerializer);
return new SourceCoordinator<>(
operatorName,
- coordinatorExecutor,
source,
sourceCoordinatorContext,
context.getCoordinatorStore(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
index 8f7a806..76a1b54 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
@@ -22,6 +22,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -29,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.shutdownExecutorForcefully;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -62,8 +64,8 @@ public class ExecutorNotifierTest {
@After
public void tearDown() throws InterruptedException {
- notifier.close();
- closeExecutorToNotify();
+ shutdownExecutorForcefully(workerExecutor, Duration.ofNanos(Long.MAX_VALUE));
+ shutdownExecutorForcefully(executorToNotify, Duration.ofNanos(Long.MAX_VALUE));
}
@Test
@@ -77,7 +79,6 @@ public class ExecutorNotifierTest {
latch.countDown();
});
latch.await();
- closeExecutorToNotify();
assertEquals(1234, result.get());
}
@@ -110,7 +111,6 @@ public class ExecutorNotifierTest {
throw exception2;
});
latch.await();
- closeExecutorToNotify();
// The uncaught exception handler may fire after the executor has shutdown.
// We need to wait on the countdown latch here.
exceptionInHandlerLatch.await(10000L, TimeUnit.MILLISECONDS);
@@ -128,15 +128,9 @@ public class ExecutorNotifierTest {
throw exception;
});
latch.await();
- closeExecutorToNotify();
// The uncaught exception handler may fire after the executor has shutdown.
// We need to wait on the countdown latch here.
exceptionInHandlerLatch.await(10000L, TimeUnit.MILLISECONDS);
assertEquals(exception, exceptionInHandler);
}
-
- private void closeExecutorToNotify() throws InterruptedException {
- executorToNotify.shutdown();
- executorToNotify.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index 71df249..12d29bf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -34,10 +34,12 @@ import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.Test;
@@ -53,6 +55,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
@@ -244,7 +248,6 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
final SourceCoordinator<?, ?> coordinator =
new SourceCoordinator<>(
OPERATOR_NAME,
- coordinatorExecutor,
new EnumeratorCreatingSource<>(() -> splitEnumerator),
context,
new CoordinatorStoreImpl(),
@@ -266,7 +269,6 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
final SourceCoordinator<?, ?> coordinator =
new SourceCoordinator<>(
OPERATOR_NAME,
- coordinatorExecutor,
new EnumeratorCreatingSource<>(
() -> {
throw failureReason;
@@ -296,7 +298,6 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
final SourceCoordinator<?, ?> coordinator =
new SourceCoordinator<>(
OPERATOR_NAME,
- coordinatorExecutor,
new EnumeratorCreatingSource<>(() -> splitEnumerator),
context,
new CoordinatorStoreImpl(),
@@ -314,6 +315,62 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
}
@Test
+ public void testBlockOnClose() throws Exception {
+ // It is possible that the split enumerator submits some heavy-duty work to the
+ // coordinator executor which blocks the coordinator closure.
+ final CountDownLatch latch = new CountDownLatch(1);
+ try (final MockSplitEnumeratorContext<MockSourceSplit> enumeratorContext =
+ new MockSplitEnumeratorContext<>(1);
+ final MockSplitEnumerator splitEnumerator =
+ new MockSplitEnumerator(1, enumeratorContext) {
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ context.callAsync(
+ () -> 1L,
+ (ignored, t) -> {
+ latch.countDown();
+ // Submit a callable that will never return.
+ try {
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ };
+ final SourceCoordinator<?, ?> coordinator =
+ new SourceCoordinator<>(
+ OPERATOR_NAME,
+ new EnumeratorCreatingSource<>(() -> splitEnumerator),
+ context,
+ new CoordinatorStoreImpl())) {
+
+ coordinator.start();
+ coordinator.handleEventFromOperator(1, new SourceEventWrapper(new SourceEvent() {}));
+ // Wait until the coordinator executor blocks.
+ latch.await();
+
+ CompletableFuture<?> future =
+ ComponentClosingUtils.closeAsyncWithTimeout(
+ "testBlockOnClose",
+ (ThrowingRunnable<Exception>) coordinator::close,
+ Duration.ofMillis(1));
+
+ future.exceptionally(
+ e -> {
+ assertTrue(e instanceof TimeoutException);
+ return null;
+ })
+ .get();
+
+ waitUtil(
+ splitEnumerator::closed,
+ Duration.ofSeconds(5),
+ "Split enumerator was not closed in 5 seconds.");
+ }
+ }
+
+ @Test
public void testUserClassLoaderWhenCreatingNewEnumerator() throws Exception {
final ClassLoader testClassLoader = new URLClassLoader(new URL[0]);
final OperatorCoordinator.Context context =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index f59cb2d..d9f6c32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -161,7 +161,6 @@ public abstract class SourceCoordinatorTestBase {
return new SourceCoordinator<>(
OPERATOR_NAME,
- coordinatorExecutor,
mockSource,
getNewSourceCoordinatorContext(),
new CoordinatorStoreImpl(),