You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2022/02/21 07:28:42 UTC

[flink] 03/03: [FLINK-24607] Make OperatorCoordinator closure more robust.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0f19c2472c54aac97e4067f5398731ab90036d1a
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Thu Feb 10 15:13:55 2022 +0800

    [FLINK-24607] Make OperatorCoordinator closure more robust.
---
 .../RecreateOnResetOperatorCoordinator.java        | 12 ++++-
 .../source/coordinator/ExecutorNotifier.java       | 21 +-------
 .../source/coordinator/SourceCoordinator.java      | 36 ++++---------
 .../coordinator/SourceCoordinatorContext.java      | 24 +++++----
 .../coordinator/SourceCoordinatorProvider.java     | 11 +---
 .../source/coordinator/ExecutorNotifierTest.java   | 14 ++---
 .../source/coordinator/SourceCoordinatorTest.java  | 63 ++++++++++++++++++++--
 .../coordinator/SourceCoordinatorTestBase.java     |  1 -
 8 files changed, 102 insertions(+), 80 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index 5c660d0..ffab3ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -128,8 +128,16 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator {
         // capture the status whether the coordinator was started when this method was called
         final boolean wasStarted = this.started;
 
-        closingFuture.thenRun(
-                () -> {
+        closingFuture.whenComplete(
+                (ignored, e) -> {
+                    if (e != null) {
+                        LOG.warn(
+                                String.format(
+                                        "Received exception when closing "
+                                                + "operator coordinator for %s.",
+                                        oldCoordinator.operatorId),
+                                e);
+                    }
                     if (!closed) {
                         // The previous coordinator has closed. Create a new one.
                         newCoordinator.createNewInternalCoordinator(context, provider);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
index e52f6cd..fe4cf8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
@@ -25,23 +25,20 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 
 /**
  * This class is used to coordinate between two components, where one component has an executor
  * following the mailbox model and the other component notifies it when needed.
  */
-public class ExecutorNotifier implements AutoCloseable {
+public class ExecutorNotifier {
     private static final Logger LOG = LoggerFactory.getLogger(ExecutorNotifier.class);
     private final ScheduledExecutorService workerExecutor;
     private final Executor executorToNotify;
-    private final AtomicBoolean closed;
 
     public ExecutorNotifier(ScheduledExecutorService workerExecutor, Executor executorToNotify) {
         this.executorToNotify = executorToNotify;
         this.workerExecutor = workerExecutor;
-        this.closed = new AtomicBoolean(false);
     }
 
     /**
@@ -140,20 +137,4 @@ public class ExecutorNotifier implements AutoCloseable {
                 periodMs,
                 TimeUnit.MILLISECONDS);
     }
-
-    /**
-     * Close the executor notifier. This is a blocking call which waits for all the async calls to
-     * finish before it returns.
-     *
-     * @throws InterruptedException when interrupted during closure.
-     */
-    public void close() throws InterruptedException {
-        if (!closed.compareAndSet(false, true)) {
-            LOG.debug("The executor notifier has been closed.");
-            return;
-        }
-        // Shutdown the worker executor, so no more worker tasks can run.
-        workerExecutor.shutdownNow();
-        workerExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
-    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index b2916dd..c1c61b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -61,12 +61,13 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.Arrays.asList;
 import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readAndVerifyCoordinatorSerdeVersion;
 import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readBytes;
 import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.writeCoordinatorSerdeVersion;
+import static org.apache.flink.util.IOUtils.closeAll;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -93,8 +94,6 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
 
     /** The name of the operator this SourceCoordinator is associated with. */
     private final String operatorName;
-    /** A single-thread executor to handle all the changes to the coordinator. */
-    private final ScheduledExecutorService coordinatorExecutor;
     /** The Source that is associated with this SourceCoordinator. */
     private final Source<?, SplitT, EnumChkT> source;
     /** The serializer that handles the serde of the SplitEnumerator checkpoints. */
@@ -113,13 +112,11 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
 
     public SourceCoordinator(
             String operatorName,
-            ScheduledExecutorService coordinatorExecutor,
             Source<?, SplitT, EnumChkT> source,
             SourceCoordinatorContext<SplitT> context,
             CoordinatorStore coordinatorStore) {
         this(
                 operatorName,
-                coordinatorExecutor,
                 source,
                 context,
                 coordinatorStore,
@@ -128,13 +125,11 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
 
     public SourceCoordinator(
             String operatorName,
-            ScheduledExecutorService coordinatorExecutor,
             Source<?, SplitT, EnumChkT> source,
             SourceCoordinatorContext<SplitT> context,
             CoordinatorStore coordinatorStore,
             WatermarkAlignmentParams watermarkAlignmentParams) {
         this.operatorName = operatorName;
-        this.coordinatorExecutor = coordinatorExecutor;
         this.source = source;
         this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
         this.context = context;
@@ -144,11 +139,12 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
         if (watermarkAlignmentParams.isEnabled()) {
             coordinatorStore.putIfAbsent(
                     watermarkAlignmentParams.getWatermarkGroup(), new WatermarkAggregator<>());
-            coordinatorExecutor.scheduleAtFixedRate(
-                    this::announceCombinedWatermark,
-                    watermarkAlignmentParams.getUpdateInterval(),
-                    watermarkAlignmentParams.getUpdateInterval(),
-                    TimeUnit.MILLISECONDS);
+            context.getCoordinatorExecutor()
+                    .scheduleAtFixedRate(
+                            this::announceCombinedWatermark,
+                            watermarkAlignmentParams.getUpdateInterval(),
+                            watermarkAlignmentParams.getUpdateInterval(),
+                            TimeUnit.MILLISECONDS);
         }
     }
 
@@ -216,18 +212,8 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
     @Override
     public void close() throws Exception {
         LOG.info("Closing SourceCoordinator for source {}.", operatorName);
-        try {
-            if (started) {
-                context.close();
-                if (enumerator != null) {
-                    enumerator.close();
-                }
-            }
-        } finally {
-            coordinatorExecutor.shutdownNow();
-            // We do not expect this to actually block for long. At this point, there should
-            // be very few task running in the executor, if any.
-            coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+        if (started) {
+            closeAll(asList(context, enumerator), Throwable.class);
         }
         LOG.info("Source coordinator for source {} closed.", operatorName);
     }
@@ -414,7 +400,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
             return;
         }
 
-        coordinatorExecutor.execute(
+        context.runInCoordinatorThread(
                 () -> {
                     try {
                         action.run();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 83823f7..064a743 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -42,6 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -50,12 +51,12 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 
+import static org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.shutdownExecutorForcefully;
+
 /**
  * A context class for the {@link OperatorCoordinator}. Compared with {@link SplitEnumeratorContext}
  * this class allows interaction with state and sending {@link OperatorEvent} to the SourceOperator
@@ -82,7 +83,8 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
 
     private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinatorContext.class);
 
-    private final ExecutorService coordinatorExecutor;
+    private final ScheduledExecutorService workerExecutor;
+    private final ScheduledExecutorService coordinatorExecutor;
     private final ExecutorNotifier notifier;
     private final OperatorCoordinator.Context operatorCoordinatorContext;
     private final SimpleVersionedSerializer<SplitT> splitSerializer;
@@ -95,13 +97,12 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
     private volatile boolean closed;
 
     public SourceCoordinatorContext(
-            ExecutorService coordinatorExecutor,
             SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
             int numWorkerThreads,
             OperatorCoordinator.Context operatorCoordinatorContext,
             SimpleVersionedSerializer<SplitT> splitSerializer) {
         this(
-                coordinatorExecutor,
+                Executors.newScheduledThreadPool(1, coordinatorThreadFactory),
                 Executors.newScheduledThreadPool(
                         numWorkerThreads,
                         new ExecutorThreadFactory(
@@ -115,12 +116,13 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
     // Package private method for unit test.
     @VisibleForTesting
     SourceCoordinatorContext(
-            ExecutorService coordinatorExecutor,
+            ScheduledExecutorService coordinatorExecutor,
             ScheduledExecutorService workerExecutor,
             SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
             OperatorCoordinator.Context operatorCoordinatorContext,
             SimpleVersionedSerializer<SplitT> splitSerializer,
             SplitAssignmentTracker<SplitT> splitAssignmentTracker) {
+        this.workerExecutor = workerExecutor;
         this.coordinatorExecutor = coordinatorExecutor;
         this.coordinatorThreadFactory = coordinatorThreadFactory;
         this.operatorCoordinatorContext = operatorCoordinatorContext;
@@ -173,6 +175,10 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
                 String.format("Failed to send event %s to subtask %d", event, subtaskId));
     }
 
+    ScheduledExecutorService getCoordinatorExecutor() {
+        return coordinatorExecutor;
+    }
+
     @Override
     public int currentParallelism() {
         return operatorCoordinatorContext.currentParallelism();
@@ -259,9 +265,9 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
     @Override
     public void close() throws InterruptedException {
         closed = true;
-        notifier.close();
-        coordinatorExecutor.shutdown();
-        coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        // Close quietly so the closing sequence will be executed completely.
+        shutdownExecutorForcefully(workerExecutor, Duration.ofNanos(Long.MAX_VALUE));
+        shutdownExecutorForcefully(coordinatorExecutor, Duration.ofNanos(Long.MAX_VALUE));
     }
 
     // --------- Package private additional methods for the SourceCoordinator ------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index b74c007..55df066 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -31,8 +31,6 @@ import org.apache.flink.util.FatalExitExceptionHandler;
 import javax.annotation.Nullable;
 
 import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.function.BiConsumer;
 
@@ -75,20 +73,13 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit>
         CoordinatorExecutorThreadFactory coordinatorThreadFactory =
                 new CoordinatorExecutorThreadFactory(
                         coordinatorThreadName, context.getUserCodeClassloader());
-        ScheduledExecutorService coordinatorExecutor =
-                Executors.newScheduledThreadPool(1, coordinatorThreadFactory);
 
         SimpleVersionedSerializer<SplitT> splitSerializer = source.getSplitSerializer();
         SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
                 new SourceCoordinatorContext<>(
-                        coordinatorExecutor,
-                        coordinatorThreadFactory,
-                        numWorkerThreads,
-                        context,
-                        splitSerializer);
+                        coordinatorThreadFactory, numWorkerThreads, context, splitSerializer);
         return new SourceCoordinator<>(
                 operatorName,
-                coordinatorExecutor,
                 source,
                 sourceCoordinatorContext,
                 context.getCoordinatorStore(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
index 8f7a806..76a1b54 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
@@ -22,6 +22,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -29,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.shutdownExecutorForcefully;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -62,8 +64,8 @@ public class ExecutorNotifierTest {
 
     @After
     public void tearDown() throws InterruptedException {
-        notifier.close();
-        closeExecutorToNotify();
+        shutdownExecutorForcefully(workerExecutor, Duration.ofNanos(Long.MAX_VALUE));
+        shutdownExecutorForcefully(executorToNotify, Duration.ofNanos(Long.MAX_VALUE));
     }
 
     @Test
@@ -77,7 +79,6 @@ public class ExecutorNotifierTest {
                     latch.countDown();
                 });
         latch.await();
-        closeExecutorToNotify();
         assertEquals(1234, result.get());
     }
 
@@ -110,7 +111,6 @@ public class ExecutorNotifierTest {
                     throw exception2;
                 });
         latch.await();
-        closeExecutorToNotify();
         // The uncaught exception handler may fire after the executor has shutdown.
         // We need to wait on the countdown latch here.
         exceptionInHandlerLatch.await(10000L, TimeUnit.MILLISECONDS);
@@ -128,15 +128,9 @@ public class ExecutorNotifierTest {
                     throw exception;
                 });
         latch.await();
-        closeExecutorToNotify();
         // The uncaught exception handler may fire after the executor has shutdown.
         // We need to wait on the countdown latch here.
         exceptionInHandlerLatch.await(10000L, TimeUnit.MILLISECONDS);
         assertEquals(exception, exceptionInHandler);
     }
-
-    private void closeExecutorToNotify() throws InterruptedException {
-        executorToNotify.shutdown();
-        executorToNotify.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index 71df249..12d29bf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -34,10 +34,12 @@ import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
 import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
 import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.junit.Test;
 
@@ -53,6 +55,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 
 import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
@@ -244,7 +248,6 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
                 final SourceCoordinator<?, ?> coordinator =
                         new SourceCoordinator<>(
                                 OPERATOR_NAME,
-                                coordinatorExecutor,
                                 new EnumeratorCreatingSource<>(() -> splitEnumerator),
                                 context,
                                 new CoordinatorStoreImpl(),
@@ -266,7 +269,6 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
         final SourceCoordinator<?, ?> coordinator =
                 new SourceCoordinator<>(
                         OPERATOR_NAME,
-                        coordinatorExecutor,
                         new EnumeratorCreatingSource<>(
                                 () -> {
                                     throw failureReason;
@@ -296,7 +298,6 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
                 final SourceCoordinator<?, ?> coordinator =
                         new SourceCoordinator<>(
                                 OPERATOR_NAME,
-                                coordinatorExecutor,
                                 new EnumeratorCreatingSource<>(() -> splitEnumerator),
                                 context,
                                 new CoordinatorStoreImpl(),
@@ -314,6 +315,62 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
     }
 
     @Test
+    public void testBlockOnClose() throws Exception {
+        // It is possible that the split enumerator submits some heavy-duty work to the
+        // coordinator executor which blocks the coordinator closure.
+        final CountDownLatch latch = new CountDownLatch(1);
+        try (final MockSplitEnumeratorContext<MockSourceSplit> enumeratorContext =
+                        new MockSplitEnumeratorContext<>(1);
+                final MockSplitEnumerator splitEnumerator =
+                        new MockSplitEnumerator(1, enumeratorContext) {
+                            @Override
+                            public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+                                context.callAsync(
+                                        () -> 1L,
+                                        (ignored, t) -> {
+                                            latch.countDown();
+                                            // Submit a callable that will never return.
+                                            try {
+                                                Thread.sleep(Long.MAX_VALUE);
+                                            } catch (InterruptedException e) {
+                                                throw new RuntimeException(e);
+                                            }
+                                        });
+                            }
+                        };
+                final SourceCoordinator<?, ?> coordinator =
+                        new SourceCoordinator<>(
+                                OPERATOR_NAME,
+                                new EnumeratorCreatingSource<>(() -> splitEnumerator),
+                                context,
+                                new CoordinatorStoreImpl())) {
+
+            coordinator.start();
+            coordinator.handleEventFromOperator(1, new SourceEventWrapper(new SourceEvent() {}));
+            // Wait until the coordinator executor blocks.
+            latch.await();
+
+            CompletableFuture<?> future =
+                    ComponentClosingUtils.closeAsyncWithTimeout(
+                            "testBlockOnClose",
+                            (ThrowingRunnable<Exception>) coordinator::close,
+                            Duration.ofMillis(1));
+
+            future.exceptionally(
+                            e -> {
+                                assertTrue(e instanceof TimeoutException);
+                                return null;
+                            })
+                    .get();
+
+            waitUtil(
+                    splitEnumerator::closed,
+                    Duration.ofSeconds(5),
+                    "Split enumerator was not closed in 5 seconds.");
+        }
+    }
+
+    @Test
     public void testUserClassLoaderWhenCreatingNewEnumerator() throws Exception {
         final ClassLoader testClassLoader = new URLClassLoader(new URL[0]);
         final OperatorCoordinator.Context context =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index f59cb2d..d9f6c32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -161,7 +161,6 @@ public abstract class SourceCoordinatorTestBase {
 
         return new SourceCoordinator<>(
                 OPERATOR_NAME,
-                coordinatorExecutor,
                 mockSource,
                 getNewSourceCoordinatorContext(),
                 new CoordinatorStoreImpl(),