You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/12/27 01:27:51 UTC
[flink] branch master updated: [FLINK-30334][runtime] Fix noMoreSplits event handling for HybridSource
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6e4e6c68a4a [FLINK-30334][runtime] Fix noMoreSplits event handling for HybridSource
6e4e6c68a4a is described below
commit 6e4e6c68a4a179d932dacafb2771cc84f730bbc0
Author: Ran Tao <ch...@gmail.com>
AuthorDate: Tue Dec 20 17:28:15 2022 +0800
[FLINK-30334][runtime] Fix noMoreSplits event handling for HybridSource
This closes #21464.
---
.../source/hybrid/HybridSourceSplitEnumerator.java | 34 +++++++++++++++---
.../hybrid/HybridSourceSplitEnumeratorTest.java | 37 +++++++++++++++++++
.../source/SupportsIntermediateNoMoreSplits.java | 41 ++++++++++++++++++++++
.../source/mocks/MockSplitEnumeratorContext.java | 20 +++++++++--
.../coordinator/SourceCoordinatorContext.java | 16 ++++++++-
.../coordinator/SourceCoordinatorContextTest.java | 20 +++++++++++
6 files changed, 161 insertions(+), 7 deletions(-)
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
index c09653ec71b..b1eeec1327c 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.util.Preconditions;
@@ -157,7 +158,7 @@ public class HybridSourceSplitEnumerator
LOG.debug("Restoring splits to subtask={} {}", subtaskId, splits);
context.assignSplits(
new SplitsAssignment<>(Collections.singletonMap(subtaskId, splits)));
- context.signalNoMoreSplits(subtaskId);
+ checkAndSignalNoMoreSplits(context, subtaskId, sourceIndex, sources.size());
}
if (splitsBySource.isEmpty()) {
pendingSplits.remove(subtaskId);
@@ -279,7 +280,11 @@ public class HybridSourceSplitEnumerator
currentEnumeratorCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
SplitEnumeratorContextProxy delegatingContext =
new SplitEnumeratorContextProxy(
- currentSourceIndex, context, readerSourceIndex, switchedSources);
+ currentSourceIndex,
+ context,
+ readerSourceIndex,
+ switchedSources,
+ sources.size());
try {
if (restoredEnumeratorState == null) {
currentEnumerator = source.createEnumerator(delegatingContext);
@@ -315,16 +320,19 @@ public class HybridSourceSplitEnumerator
private final int sourceIndex;
private final Map<Integer, Integer> readerSourceIndex;
private final SwitchedSources switchedSources;
+ private final int sourceSize;
private SplitEnumeratorContextProxy(
int sourceIndex,
SplitEnumeratorContext<HybridSourceSplit> realContext,
Map<Integer, Integer> readerSourceIndex,
- SwitchedSources switchedSources) {
+ SwitchedSources switchedSources,
+ int sourceSize) {
this.realContext = realContext;
this.sourceIndex = sourceIndex;
this.readerSourceIndex = readerSourceIndex;
this.switchedSources = switchedSources;
+ this.sourceSize = sourceSize;
}
@Override
@@ -392,7 +400,8 @@ public class HybridSourceSplitEnumerator
@Override
public void signalNoMoreSplits(int subtask) {
- realContext.signalNoMoreSplits(subtask);
+ // intercept noMoreSplits signaled by the child source enumerators
+ checkAndSignalNoMoreSplits(realContext, subtask, sourceIndex, sourceSize);
}
@Override
@@ -414,4 +423,21 @@ public class HybridSourceSplitEnumerator
realContext.runInCoordinatorThread(runnable);
}
}
+
+ private static void checkAndSignalNoMoreSplits(
+ SplitEnumeratorContext<HybridSourceSplit> context,
+ int subtaskId,
+ int sourceIndex,
+ int sourceSize) {
+ Preconditions.checkState(
+ context instanceof SupportsIntermediateNoMoreSplits,
+ "The split enumerator context %s must implement SupportsIntermediateNoMoreSplits "
+ + "to be used in hybrid source scenario.",
+ context.getClass().getCanonicalName());
+ if (sourceIndex >= sourceSize - 1) {
+ context.signalNoMoreSplits(subtaskId);
+ } else {
+ ((SupportsIntermediateNoMoreSplits) context).signalIntermediateNoMoreSplits(subtaskId);
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
index 99d1d3d3f60..e4f268dedef 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
@@ -216,6 +216,43 @@ public class HybridSourceSplitEnumeratorTest {
Mockito.verify(underlyingEnumeratorSpy).handleSourceEvent(0, se);
}
+ @Test
+ public void testInterceptNoMoreSplitEvent() {
+ context = new MockSplitEnumeratorContext<>(2);
+ source = HybridSource.builder(MOCK_SOURCE).addSource(MOCK_SOURCE).build();
+
+ enumerator = (HybridSourceSplitEnumerator) source.createEnumerator(context);
+ enumerator.start();
+ // mock enumerator assigns splits once all readers are registered
+ // At this time, hasNoMoreSplit check will call context.signalIntermediateNoMoreSplits
+ registerReader(context, enumerator, SUBTASK0);
+ registerReader(context, enumerator, SUBTASK1);
+ enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
+ enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(-1));
+ assertThat(context.hasNoMoreSplits(0)).isFalse();
+ assertThat(context.hasNoMoreSplits(1)).isFalse();
+ splitFromSource0 =
+ context.getSplitsAssignmentSequence().get(0).assignment().get(SUBTASK0).get(0);
+
+ // task read finished, hasNoMoreSplit check will call context.signalNoMoreSplits, this is
+ // final finished event
+ enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(0));
+ enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(0));
+ assertThat(context.hasNoMoreSplits(0)).isTrue();
+ assertThat(context.hasNoMoreSplits(1)).isTrue();
+
+ // test add splits back, then SUBTASK0 restore splitFromSource0 split
+ // reset splits assignment & previous subtaskHasNoMoreSplits flag.
+ context.getSplitsAssignmentSequence().clear();
+ context.resetNoMoreSplits(0);
+ enumerator.addReader(SUBTASK0);
+ enumerator.addSplitsBack(Collections.singletonList(splitFromSource0), SUBTASK0);
+ enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
+ assertThat(context.hasNoMoreSplits(0)).isFalse();
+ enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(0));
+ assertThat(context.hasNoMoreSplits(0)).isTrue();
+ }
+
private static class UnderlyingEnumeratorWrapper
implements SplitEnumerator<MockSourceSplit, Object> {
private static final MockSourceSplit SPLIT_1 = new MockSourceSplit(0, 0, 1);
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsIntermediateNoMoreSplits.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsIntermediateNoMoreSplits.java
new file mode 100644
index 00000000000..8c58e6ee5bb
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsIntermediateNoMoreSplits.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A decorative interface of {@link SplitEnumeratorContext} which allows to handle intermediate
+ * NoMoreSplits.
+ *
+ * <p>The split enumerator must implement this interface if it needs to deal with NoMoreSplits event
+ * in cases of a subtask can have multiple child sources. e.g. hybrid source.
+ */
+@Internal
+public interface SupportsIntermediateNoMoreSplits {
+ /**
+ * Signals a subtask that it will not receive split for current source, but it will receive
+ * split for next sources. A common scenario is HybridSource. This indicates that task not truly
+ * read finished.
+ *
+ * @param subtask The index of the operator's parallel subtask that shall be signaled it will
+ * receive splits later.
+ */
+ void signalIntermediateNoMoreSplits(int subtask);
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java
index 42aad607b37..6389f198763 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.ThrowableCatchingRunnable;
@@ -49,7 +50,7 @@ import java.util.function.BiConsumer;
/** A mock class for {@link SplitEnumeratorContext}. */
public class MockSplitEnumeratorContext<SplitT extends SourceSplit>
- implements SplitEnumeratorContext<SplitT>, AutoCloseable {
+ implements SplitEnumeratorContext<SplitT>, SupportsIntermediateNoMoreSplits, AutoCloseable {
private final Map<Integer, List<SourceEvent>> sentSourceEvent;
private final ConcurrentMap<Integer, ReaderInfo> registeredReaders;
private final List<SplitsAssignment<SplitT>> splitsAssignmentSequence;
@@ -61,6 +62,7 @@ public class MockSplitEnumeratorContext<SplitT extends SourceSplit>
private final BlockingQueue<Callable<Future<?>>> oneTimeCallables;
private final List<Callable<Future<?>>> periodicCallables;
private final AtomicBoolean stoppedAcceptAsyncCalls;
+ private final boolean[] subtaskHasNoMoreSplits;
private final int parallelism;
@@ -78,6 +80,7 @@ public class MockSplitEnumeratorContext<SplitT extends SourceSplit>
getExecutor(getThreadFactory("SplitEnumerator-worker", errorInWorkerThread));
this.mainExecutor = getExecutor(mainThreadFactory);
this.stoppedAcceptAsyncCalls = new AtomicBoolean(false);
+ this.subtaskHasNoMoreSplits = new boolean[parallelism];
}
@Override
@@ -120,7 +123,16 @@ public class MockSplitEnumeratorContext<SplitT extends SourceSplit>
}
@Override
- public void signalNoMoreSplits(int subtask) {}
+ public void signalNoMoreSplits(int subtask) {
+ subtaskHasNoMoreSplits[subtask] = true;
+ }
+
+ @Override
+ public void signalIntermediateNoMoreSplits(int subtask) {}
+
+ public void resetNoMoreSplits(int subtask) {
+ subtaskHasNoMoreSplits[subtask] = false;
+ }
@Override
public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) {
@@ -229,6 +241,10 @@ public class MockSplitEnumeratorContext<SplitT extends SourceSplit>
return splitsAssignmentSequence;
}
+ public boolean hasNoMoreSplits(int subtaskIndex) {
+ return subtaskHasNoMoreSplits[subtaskIndex];
+ }
+
// ------------- private helpers -------------
private void checkError() throws Throwable {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index cca65355a2c..59ab6d8c2d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
@@ -87,7 +88,7 @@ import static org.apache.flink.util.Preconditions.checkState;
*/
@Internal
public class SourceCoordinatorContext<SplitT extends SourceSplit>
- implements SplitEnumeratorContext<SplitT>, AutoCloseable {
+ implements SplitEnumeratorContext<SplitT>, SupportsIntermediateNoMoreSplits, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinatorContext.class);
@@ -289,6 +290,19 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
"Failed to send 'NoMoreSplits' to reader " + subtask);
}
+ @Override
+ public void signalIntermediateNoMoreSplits(int subtask) {
+ checkSubtaskIndex(subtask);
+
+ // It's an intermediate noMoreSplit event, notify subtask to deal with this event.
+ callInCoordinatorThread(
+ () -> {
+ signalNoMoreSplitsToAttempts(subtask);
+ return null;
+ },
+ "Failed to send 'IntermediateNoMoreSplits' to reader " + subtask);
+ }
+
@Override
public <T> void callAsync(
Callable<T> callable,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index a466c0d0175..3f15e88bd2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -223,6 +223,26 @@ class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
assertThat(operatorCoordinatorContext.isJobFailed()).isFalse();
}
+ @Test
+ void testSupportsIntermediateNoMoreSplits() throws Exception {
+ sourceReady();
+ registerReaders();
+
+ SplitsAssignment<MockSourceSplit> splitsAssignment = getSplitsAssignment(2, 0);
+ context.assignSplits(splitsAssignment);
+ context.signalIntermediateNoMoreSplits(0);
+ context.signalIntermediateNoMoreSplits(1);
+ assertThat(context.hasNoMoreSplits(0)).isFalse();
+ assertThat(context.hasNoMoreSplits(1)).isFalse();
+ assertThat(context.hasNoMoreSplits(2)).isFalse();
+
+ context.signalNoMoreSplits(0);
+ context.signalNoMoreSplits(1);
+ assertThat(context.hasNoMoreSplits(0)).isTrue();
+ assertThat(context.hasNoMoreSplits(1)).isTrue();
+ assertThat(context.hasNoMoreSplits(2)).isFalse();
+ }
+
// ------------------------
private List<ReaderInfo> registerReaders() {