You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/07 02:33:49 UTC

[flink] 01/03: [FLINK-28709][source] Introduce coordinatorListeningID in SourceCoordinator to listen to events from other coordinators

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9bda67795628576aa4f161df6cb976ba71c3936d
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Jul 27 15:48:38 2022 +0800

    [FLINK-28709][source] Introduce coordinatorListeningID in SourceCoordinator to listen to events from other coordinators
    
    This closes #20374
---
 .../source/coordinator/SourceCoordinator.java      | 23 +++++++++++++--
 .../coordinator/SourceCoordinatorProvider.java     |  8 +++--
 .../coordinator/SourceCoordinatorProviderTest.java |  3 +-
 .../source/coordinator/SourceCoordinatorTest.java  | 34 ++++++++++++++++++----
 .../coordinator/SourceCoordinatorTestBase.java     |  3 +-
 .../api/operators/SourceOperatorFactory.java       | 11 ++++++-
 .../api/transformations/SourceTransformation.java  | 12 ++++++++
 .../SourceTransformationTranslator.java            |  1 +
 8 files changed, 83 insertions(+), 12 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index eff63a023a9..c4083f9d173 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -111,6 +111,12 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
     /** A flag marking whether the coordinator has started. */
     private boolean started;
 
+    /**
+     * An ID that the coordinator will register self in the coordinator store with. Other
+     * coordinators may send events to this coordinator by the ID.
+     */
+    @Nullable private final String coordinatorListeningID;
+
     public SourceCoordinator(
             String operatorName,
             Source<?, SplitT, EnumChkT> source,
@@ -121,7 +127,8 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
                 source,
                 context,
                 coordinatorStore,
-                WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
+                WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                null);
     }
 
     public SourceCoordinator(
@@ -129,13 +136,15 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
             Source<?, SplitT, EnumChkT> source,
             SourceCoordinatorContext<SplitT> context,
             CoordinatorStore coordinatorStore,
-            WatermarkAlignmentParams watermarkAlignmentParams) {
+            WatermarkAlignmentParams watermarkAlignmentParams,
+            @Nullable String coordinatorListeningID) {
         this.operatorName = operatorName;
         this.source = source;
         this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
         this.context = context;
         this.coordinatorStore = coordinatorStore;
         this.watermarkAlignmentParams = watermarkAlignmentParams;
+        this.coordinatorListeningID = coordinatorListeningID;
 
         if (watermarkAlignmentParams.isEnabled()) {
             if (context.isConcurrentExecutionAttemptsSupported()) {
@@ -214,6 +223,16 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
         // We rely on the single-threaded coordinator executor to guarantee
         // the other methods are invoked after the enumerator has started.
         runInEventLoop(() -> enumerator.start(), "starting the SplitEnumerator.");
+
+        if (coordinatorListeningID != null) {
+            if (coordinatorStore.containsKey(coordinatorListeningID)) {
+                // The coordinator will be recreated after global failover. It should be registered
+                // again replacing the previous one.
+                coordinatorStore.computeIfPresent(coordinatorListeningID, (id, origin) -> this);
+            } else {
+                coordinatorStore.putIfAbsent(coordinatorListeningID, this);
+            }
+        }
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 3575c13c974..c76a3ec1910 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -43,6 +43,7 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit>
     private final Source<?, SplitT, ?> source;
     private final int numWorkerThreads;
     private final WatermarkAlignmentParams alignmentParams;
+    @Nullable private final String coordinatorListeningID;
 
     /**
      * Construct the {@link SourceCoordinatorProvider}.
@@ -60,12 +61,14 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit>
             OperatorID operatorID,
             Source<?, SplitT, ?> source,
             int numWorkerThreads,
-            WatermarkAlignmentParams alignmentParams) {
+            WatermarkAlignmentParams alignmentParams,
+            @Nullable String coordinatorListeningID) {
         super(operatorID);
         this.operatorName = operatorName;
         this.source = source;
         this.numWorkerThreads = numWorkerThreads;
         this.alignmentParams = alignmentParams;
+        this.coordinatorListeningID = coordinatorListeningID;
     }
 
     @Override
@@ -87,7 +90,8 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit>
                 source,
                 sourceCoordinatorContext,
                 context.getCoordinatorStore(),
-                alignmentParams);
+                alignmentParams,
+                coordinatorListeningID);
     }
 
     /** A thread factory class that provides some helper methods. */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
index 5dd7196a82e..5452c4c0e06 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
@@ -55,7 +55,8 @@ public class SourceCoordinatorProviderTest {
                         OPERATOR_ID,
                         new MockSource(Boundedness.BOUNDED, NUM_SPLITS),
                         1,
-                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
+                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                        null);
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index 03bd8a30c3b..73ef276d7fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
+import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
 import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
 import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
@@ -249,7 +250,8 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
                                 new EnumeratorCreatingSource<>(() -> splitEnumerator),
                                 context,
                                 new CoordinatorStoreImpl(),
-                                WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED)) {
+                                WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                                null)) {
 
             coordinator.start();
             waitUtil(
@@ -273,7 +275,8 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
                                 }),
                         context,
                         new CoordinatorStoreImpl(),
-                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
+                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                        null);
 
         coordinator.start();
 
@@ -299,7 +302,8 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
                                 new EnumeratorCreatingSource<>(() -> splitEnumerator),
                                 context,
                                 new CoordinatorStoreImpl(),
-                                WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED)) {
+                                WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                                null)) {
 
             coordinator.start();
             coordinator.handleEventFromOperator(1, 0, new SourceEventWrapper(new SourceEvent() {}));
@@ -382,7 +386,8 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
                         context.getOperatorId(),
                         source,
                         1,
-                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
+                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                        null);
 
         final OperatorCoordinator coordinator = provider.getCoordinator(context);
         coordinator.start();
@@ -409,7 +414,8 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
                         context.getOperatorId(),
                         source,
                         1,
-                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
+                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                        null);
 
         final OperatorCoordinator coordinator = provider.getCoordinator(context);
         coordinator.resetToCheckpoint(1L, createEmptyCheckpoint());
@@ -505,6 +511,24 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
         assertThat(events.get(5)).isInstanceOf(NoMoreSplitsEvent.class);
     }
 
+    @Test
+    public void testListeningEventsFromOtherCoordinators() throws Exception {
+        final String listeningID = "testListeningID";
+
+        CoordinatorStore store = new CoordinatorStoreImpl();
+        final SourceCoordinator<?, ?> coordinator =
+                new SourceCoordinator<>(
+                        OPERATOR_NAME,
+                        createMockSource(),
+                        context,
+                        store,
+                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                        listeningID);
+        coordinator.start();
+
+        assertThat(store.get(listeningID)).isNotNull().isSameAs(coordinator);
+    }
+
     // ------------------------------------------------------------------------
     //  test helpers
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index fa52d1feae8..94d285b2a65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -204,7 +204,8 @@ abstract class SourceCoordinatorTestBase {
                 mockSource,
                 getNewSourceCoordinatorContext(),
                 new CoordinatorStoreImpl(),
-                watermarkAlignmentParams);
+                watermarkAlignmentParams,
+                null);
     }
 
     Source<Integer, MockSourceSplit, Set<MockSourceSplit>> createMockSource() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index 2bd945d5f4c..4c364beb2b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -34,6 +34,8 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
 import org.apache.flink.util.function.FunctionWithException;
 
+import javax.annotation.Nullable;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** The Factory class for {@link SourceOperator}. */
@@ -54,6 +56,8 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
     /** The number of worker thread for the source coordinator. */
     private final int numCoordinatorWorkerThread;
 
+    private @Nullable String coordinatorListeningID;
+
     public SourceOperatorFactory(
             Source<OUT, ?, ?> source, WatermarkStrategy<OUT> watermarkStrategy) {
         this(source, watermarkStrategy, true /* emit progressive watermarks */, 1);
@@ -81,6 +85,10 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
         return source.getBoundedness();
     }
 
+    public void setCoordinatorListeningID(@Nullable String coordinatorListeningID) {
+        this.coordinatorListeningID = coordinatorListeningID;
+    }
+
     @Override
     public <T extends StreamOperator<OUT>> T createStreamOperator(
             StreamOperatorParameters<OUT> parameters) {
@@ -128,7 +136,8 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
                 operatorID,
                 source,
                 numCoordinatorWorkerThread,
-                watermarkStrategy.getAlignmentParameters());
+                watermarkStrategy.getAlignmentParameters(),
+                coordinatorListeningID);
     }
 
     @SuppressWarnings("rawtypes")
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
index 49a9042f3ad..0ed6ccd8d82 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.List;
 
@@ -41,6 +43,7 @@ public class SourceTransformation<OUT, SplitT extends SourceSplit, EnumChkT>
     private final WatermarkStrategy<OUT> watermarkStrategy;
 
     private ChainingStrategy chainingStrategy = ChainingStrategy.DEFAULT_CHAINING_STRATEGY;
+    private @Nullable String coordinatorListeningID;
 
     /**
      * Creates a new {@code Transformation} with the given name, output type and parallelism.
@@ -94,4 +97,13 @@ public class SourceTransformation<OUT, SplitT extends SourceSplit, EnumChkT>
     public ChainingStrategy getChainingStrategy() {
         return chainingStrategy;
     }
+
+    public void setCoordinatorListeningID(@Nullable String coordinatorListeningID) {
+        this.coordinatorListeningID = coordinatorListeningID;
+    }
+
+    @Nullable
+    public String getCoordinatorListeningID() {
+        return coordinatorListeningID;
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
index 88f68d2d66e..af251755a77 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
@@ -77,6 +77,7 @@ public class SourceTransformationTranslator<OUT, SplitT extends SourceSplit, Enu
                         emitProgressiveWatermarks);
 
         operatorFactory.setChainingStrategy(transformation.getChainingStrategy());
+        operatorFactory.setCoordinatorListeningID(transformation.getCoordinatorListeningID());
 
         streamGraph.addSource(
                 transformationId,