You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/07 02:33:49 UTC
[flink] 01/03: [FLINK-28709][source] Introduce coordinatorListeningID in SourceCoordinator to listen to events from other coordinators
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9bda67795628576aa4f161df6cb976ba71c3936d
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Jul 27 15:48:38 2022 +0800
[FLINK-28709][source] Introduce coordinatorListeningID in SourceCoordinator to listen to events from other coordinators
This closes #20374
---
.../source/coordinator/SourceCoordinator.java | 23 +++++++++++++--
.../coordinator/SourceCoordinatorProvider.java | 8 +++--
.../coordinator/SourceCoordinatorProviderTest.java | 3 +-
.../source/coordinator/SourceCoordinatorTest.java | 34 ++++++++++++++++++----
.../coordinator/SourceCoordinatorTestBase.java | 3 +-
.../api/operators/SourceOperatorFactory.java | 11 ++++++-
.../api/transformations/SourceTransformation.java | 12 ++++++++
.../SourceTransformationTranslator.java | 1 +
8 files changed, 83 insertions(+), 12 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index eff63a023a9..c4083f9d173 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -111,6 +111,12 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
/** A flag marking whether the coordinator has started. */
private boolean started;
+ /**
+ * An ID that the coordinator will register self in the coordinator store with. Other
+ * coordinators may send events to this coordinator by the ID.
+ */
+ @Nullable private final String coordinatorListeningID;
+
public SourceCoordinator(
String operatorName,
Source<?, SplitT, EnumChkT> source,
@@ -121,7 +127,8 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
source,
context,
coordinatorStore,
- WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
+ WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+ null);
}
public SourceCoordinator(
@@ -129,13 +136,15 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
Source<?, SplitT, EnumChkT> source,
SourceCoordinatorContext<SplitT> context,
CoordinatorStore coordinatorStore,
- WatermarkAlignmentParams watermarkAlignmentParams) {
+ WatermarkAlignmentParams watermarkAlignmentParams,
+ @Nullable String coordinatorListeningID) {
this.operatorName = operatorName;
this.source = source;
this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
this.context = context;
this.coordinatorStore = coordinatorStore;
this.watermarkAlignmentParams = watermarkAlignmentParams;
+ this.coordinatorListeningID = coordinatorListeningID;
if (watermarkAlignmentParams.isEnabled()) {
if (context.isConcurrentExecutionAttemptsSupported()) {
@@ -214,6 +223,16 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
// We rely on the single-threaded coordinator executor to guarantee
// the other methods are invoked after the enumerator has started.
runInEventLoop(() -> enumerator.start(), "starting the SplitEnumerator.");
+
+ if (coordinatorListeningID != null) {
+ if (coordinatorStore.containsKey(coordinatorListeningID)) {
+ // The coordinator will be recreated after global failover. It should be registered
+ // again replacing the previous one.
+ coordinatorStore.computeIfPresent(coordinatorListeningID, (id, origin) -> this);
+ } else {
+ coordinatorStore.putIfAbsent(coordinatorListeningID, this);
+ }
+ }
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 3575c13c974..c76a3ec1910 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -43,6 +43,7 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit>
private final Source<?, SplitT, ?> source;
private final int numWorkerThreads;
private final WatermarkAlignmentParams alignmentParams;
+ @Nullable private final String coordinatorListeningID;
/**
* Construct the {@link SourceCoordinatorProvider}.
@@ -60,12 +61,14 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit>
OperatorID operatorID,
Source<?, SplitT, ?> source,
int numWorkerThreads,
- WatermarkAlignmentParams alignmentParams) {
+ WatermarkAlignmentParams alignmentParams,
+ @Nullable String coordinatorListeningID) {
super(operatorID);
this.operatorName = operatorName;
this.source = source;
this.numWorkerThreads = numWorkerThreads;
this.alignmentParams = alignmentParams;
+ this.coordinatorListeningID = coordinatorListeningID;
}
@Override
@@ -87,7 +90,8 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit>
source,
sourceCoordinatorContext,
context.getCoordinatorStore(),
- alignmentParams);
+ alignmentParams,
+ coordinatorListeningID);
}
/** A thread factory class that provides some helper methods. */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
index 5dd7196a82e..5452c4c0e06 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
@@ -55,7 +55,8 @@ public class SourceCoordinatorProviderTest {
OPERATOR_ID,
new MockSource(Boundedness.BOUNDED, NUM_SPLITS),
1,
- WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
+ WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+ null);
}
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index 03bd8a30c3b..73ef276d7fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
+import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
@@ -249,7 +250,8 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
new EnumeratorCreatingSource<>(() -> splitEnumerator),
context,
new CoordinatorStoreImpl(),
- WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED)) {
+ WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+ null)) {
coordinator.start();
waitUtil(
@@ -273,7 +275,8 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
}),
context,
new CoordinatorStoreImpl(),
- WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
+ WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+ null);
coordinator.start();
@@ -299,7 +302,8 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
new EnumeratorCreatingSource<>(() -> splitEnumerator),
context,
new CoordinatorStoreImpl(),
- WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED)) {
+ WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+ null)) {
coordinator.start();
coordinator.handleEventFromOperator(1, 0, new SourceEventWrapper(new SourceEvent() {}));
@@ -382,7 +386,8 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
context.getOperatorId(),
source,
1,
- WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
+ WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+ null);
final OperatorCoordinator coordinator = provider.getCoordinator(context);
coordinator.start();
@@ -409,7 +414,8 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
context.getOperatorId(),
source,
1,
- WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
+ WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+ null);
final OperatorCoordinator coordinator = provider.getCoordinator(context);
coordinator.resetToCheckpoint(1L, createEmptyCheckpoint());
@@ -505,6 +511,24 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
assertThat(events.get(5)).isInstanceOf(NoMoreSplitsEvent.class);
}
+ @Test
+ public void testListeningEventsFromOtherCoordinators() throws Exception {
+ final String listeningID = "testListeningID";
+
+ CoordinatorStore store = new CoordinatorStoreImpl();
+ final SourceCoordinator<?, ?> coordinator =
+ new SourceCoordinator<>(
+ OPERATOR_NAME,
+ createMockSource(),
+ context,
+ store,
+ WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+ listeningID);
+ coordinator.start();
+
+ assertThat(store.get(listeningID)).isNotNull().isSameAs(coordinator);
+ }
+
// ------------------------------------------------------------------------
// test helpers
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index fa52d1feae8..94d285b2a65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -204,7 +204,8 @@ abstract class SourceCoordinatorTestBase {
mockSource,
getNewSourceCoordinatorContext(),
new CoordinatorStoreImpl(),
- watermarkAlignmentParams);
+ watermarkAlignmentParams,
+ null);
}
Source<Integer, MockSourceSplit, Set<MockSourceSplit>> createMockSource() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index 2bd945d5f4c..4c364beb2b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -34,6 +34,8 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
import org.apache.flink.util.function.FunctionWithException;
+import javax.annotation.Nullable;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/** The Factory class for {@link SourceOperator}. */
@@ -54,6 +56,8 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
/** The number of worker thread for the source coordinator. */
private final int numCoordinatorWorkerThread;
+ private @Nullable String coordinatorListeningID;
+
public SourceOperatorFactory(
Source<OUT, ?, ?> source, WatermarkStrategy<OUT> watermarkStrategy) {
this(source, watermarkStrategy, true /* emit progressive watermarks */, 1);
@@ -81,6 +85,10 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
return source.getBoundedness();
}
+ public void setCoordinatorListeningID(@Nullable String coordinatorListeningID) {
+ this.coordinatorListeningID = coordinatorListeningID;
+ }
+
@Override
public <T extends StreamOperator<OUT>> T createStreamOperator(
StreamOperatorParameters<OUT> parameters) {
@@ -128,7 +136,8 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
operatorID,
source,
numCoordinatorWorkerThread,
- watermarkStrategy.getAlignmentParameters());
+ watermarkStrategy.getAlignmentParameters(),
+ coordinatorListeningID);
}
@SuppressWarnings("rawtypes")
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
index 49a9042f3ad..0ed6ccd8d82 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import javax.annotation.Nullable;
+
import java.util.Collections;
import java.util.List;
@@ -41,6 +43,7 @@ public class SourceTransformation<OUT, SplitT extends SourceSplit, EnumChkT>
private final WatermarkStrategy<OUT> watermarkStrategy;
private ChainingStrategy chainingStrategy = ChainingStrategy.DEFAULT_CHAINING_STRATEGY;
+ private @Nullable String coordinatorListeningID;
/**
* Creates a new {@code Transformation} with the given name, output type and parallelism.
@@ -94,4 +97,13 @@ public class SourceTransformation<OUT, SplitT extends SourceSplit, EnumChkT>
public ChainingStrategy getChainingStrategy() {
return chainingStrategy;
}
+
+ public void setCoordinatorListeningID(@Nullable String coordinatorListeningID) {
+ this.coordinatorListeningID = coordinatorListeningID;
+ }
+
+ @Nullable
+ public String getCoordinatorListeningID() {
+ return coordinatorListeningID;
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
index 88f68d2d66e..af251755a77 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
@@ -77,6 +77,7 @@ public class SourceTransformationTranslator<OUT, SplitT extends SourceSplit, Enu
emitProgressiveWatermarks);
operatorFactory.setChainingStrategy(transformation.getChainingStrategy());
+ operatorFactory.setCoordinatorListeningID(transformation.getCoordinatorListeningID());
streamGraph.addSource(
transformationId,