You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/12/27 01:27:51 UTC

[flink] branch master updated: [FLINK-30334][runtime] Fix noMoreSplits event handling for HybridSource

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e4e6c68a4a [FLINK-30334][runtime] Fix noMoreSplits event handling for HybridSource
6e4e6c68a4a is described below

commit 6e4e6c68a4a179d932dacafb2771cc84f730bbc0
Author: Ran Tao <ch...@gmail.com>
AuthorDate: Tue Dec 20 17:28:15 2022 +0800

    [FLINK-30334][runtime] Fix noMoreSplits event handling for HybridSource
    
    This closes #21464.
---
 .../source/hybrid/HybridSourceSplitEnumerator.java | 34 +++++++++++++++---
 .../hybrid/HybridSourceSplitEnumeratorTest.java    | 37 +++++++++++++++++++
 .../source/SupportsIntermediateNoMoreSplits.java   | 41 ++++++++++++++++++++++
 .../source/mocks/MockSplitEnumeratorContext.java   | 20 +++++++++--
 .../coordinator/SourceCoordinatorContext.java      | 16 ++++++++-
 .../coordinator/SourceCoordinatorContextTest.java  | 20 +++++++++++
 6 files changed, 161 insertions(+), 7 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
index c09653ec71b..b1eeec1327c 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
 import org.apache.flink.util.Preconditions;
@@ -157,7 +158,7 @@ public class HybridSourceSplitEnumerator
                 LOG.debug("Restoring splits to subtask={} {}", subtaskId, splits);
                 context.assignSplits(
                         new SplitsAssignment<>(Collections.singletonMap(subtaskId, splits)));
-                context.signalNoMoreSplits(subtaskId);
+                checkAndSignalNoMoreSplits(context, subtaskId, sourceIndex, sources.size());
             }
             if (splitsBySource.isEmpty()) {
                 pendingSplits.remove(subtaskId);
@@ -279,7 +280,11 @@ public class HybridSourceSplitEnumerator
         currentEnumeratorCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
         SplitEnumeratorContextProxy delegatingContext =
                 new SplitEnumeratorContextProxy(
-                        currentSourceIndex, context, readerSourceIndex, switchedSources);
+                        currentSourceIndex,
+                        context,
+                        readerSourceIndex,
+                        switchedSources,
+                        sources.size());
         try {
             if (restoredEnumeratorState == null) {
                 currentEnumerator = source.createEnumerator(delegatingContext);
@@ -315,16 +320,19 @@ public class HybridSourceSplitEnumerator
         private final int sourceIndex;
         private final Map<Integer, Integer> readerSourceIndex;
         private final SwitchedSources switchedSources;
+        private final int sourceSize;
 
         private SplitEnumeratorContextProxy(
                 int sourceIndex,
                 SplitEnumeratorContext<HybridSourceSplit> realContext,
                 Map<Integer, Integer> readerSourceIndex,
-                SwitchedSources switchedSources) {
+                SwitchedSources switchedSources,
+                int sourceSize) {
             this.realContext = realContext;
             this.sourceIndex = sourceIndex;
             this.readerSourceIndex = readerSourceIndex;
             this.switchedSources = switchedSources;
+            this.sourceSize = sourceSize;
         }
 
         @Override
@@ -392,7 +400,8 @@ public class HybridSourceSplitEnumerator
 
         @Override
         public void signalNoMoreSplits(int subtask) {
-            realContext.signalNoMoreSplits(subtask);
+            // intercept noMoreSplits signaled by the child source enumerators
+            checkAndSignalNoMoreSplits(realContext, subtask, sourceIndex, sourceSize);
         }
 
         @Override
@@ -414,4 +423,21 @@ public class HybridSourceSplitEnumerator
             realContext.runInCoordinatorThread(runnable);
         }
     }
+
+    private static void checkAndSignalNoMoreSplits(
+            SplitEnumeratorContext<HybridSourceSplit> context,
+            int subtaskId,
+            int sourceIndex,
+            int sourceSize) {
+        Preconditions.checkState(
+                context instanceof SupportsIntermediateNoMoreSplits,
+                "The split enumerator context %s must implement SupportsIntermediateNoMoreSplits "
+                        + "to be used in hybrid source scenario.",
+                context.getClass().getCanonicalName());
+        if (sourceIndex >= sourceSize - 1) {
+            context.signalNoMoreSplits(subtaskId);
+        } else {
+            ((SupportsIntermediateNoMoreSplits) context).signalIntermediateNoMoreSplits(subtaskId);
+        }
+    }
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
index 99d1d3d3f60..e4f268dedef 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
@@ -216,6 +216,43 @@ public class HybridSourceSplitEnumeratorTest {
         Mockito.verify(underlyingEnumeratorSpy).handleSourceEvent(0, se);
     }
 
+    @Test
+    public void testInterceptNoMoreSplitEvent() {
+        context = new MockSplitEnumeratorContext<>(2);
+        source = HybridSource.builder(MOCK_SOURCE).addSource(MOCK_SOURCE).build();
+
+        enumerator = (HybridSourceSplitEnumerator) source.createEnumerator(context);
+        enumerator.start();
+        // mock enumerator assigns splits once all readers are registered
+        // At this time, hasNoMoreSplit check will call context.signalIntermediateNoMoreSplits
+        registerReader(context, enumerator, SUBTASK0);
+        registerReader(context, enumerator, SUBTASK1);
+        enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
+        enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(-1));
+        assertThat(context.hasNoMoreSplits(0)).isFalse();
+        assertThat(context.hasNoMoreSplits(1)).isFalse();
+        splitFromSource0 =
+                context.getSplitsAssignmentSequence().get(0).assignment().get(SUBTASK0).get(0);
+
+        // task read finished, hasNoMoreSplit check will call context.signalNoMoreSplits, this is
+        // final finished event
+        enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(0));
+        enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(0));
+        assertThat(context.hasNoMoreSplits(0)).isTrue();
+        assertThat(context.hasNoMoreSplits(1)).isTrue();
+
+        // test add splits back, then SUBTASK0 restore splitFromSource0 split
+        // reset splits assignment & previous subtaskHasNoMoreSplits flag.
+        context.getSplitsAssignmentSequence().clear();
+        context.resetNoMoreSplits(0);
+        enumerator.addReader(SUBTASK0);
+        enumerator.addSplitsBack(Collections.singletonList(splitFromSource0), SUBTASK0);
+        enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
+        assertThat(context.hasNoMoreSplits(0)).isFalse();
+        enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(0));
+        assertThat(context.hasNoMoreSplits(0)).isTrue();
+    }
+
     private static class UnderlyingEnumeratorWrapper
             implements SplitEnumerator<MockSourceSplit, Object> {
         private static final MockSourceSplit SPLIT_1 = new MockSourceSplit(0, 0, 1);
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsIntermediateNoMoreSplits.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsIntermediateNoMoreSplits.java
new file mode 100644
index 00000000000..8c58e6ee5bb
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsIntermediateNoMoreSplits.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A decorative interface of {@link SplitEnumeratorContext} which allows to handle intermediate
+ * NoMoreSplits.
+ *
+ * <p>The split enumerator must implement this interface if it needs to deal with NoMoreSplits event
+ * in cases of a subtask can have multiple child sources. e.g. hybrid source.
+ */
+@Internal
+public interface SupportsIntermediateNoMoreSplits {
+    /**
+     * Signals a subtask that it will not receive split for current source, but it will receive
+     * split for next sources. A common scenario is HybridSource. This indicates that task not truly
+     * read finished.
+     *
+     * @param subtask The index of the operator's parallel subtask that shall be signaled it will
+     *     receive splits later.
+     */
+    void signalIntermediateNoMoreSplits(int subtask);
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java
index 42aad607b37..6389f198763 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits;
 import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.util.ThrowableCatchingRunnable;
@@ -49,7 +50,7 @@ import java.util.function.BiConsumer;
 
 /** A mock class for {@link SplitEnumeratorContext}. */
 public class MockSplitEnumeratorContext<SplitT extends SourceSplit>
-        implements SplitEnumeratorContext<SplitT>, AutoCloseable {
+        implements SplitEnumeratorContext<SplitT>, SupportsIntermediateNoMoreSplits, AutoCloseable {
     private final Map<Integer, List<SourceEvent>> sentSourceEvent;
     private final ConcurrentMap<Integer, ReaderInfo> registeredReaders;
     private final List<SplitsAssignment<SplitT>> splitsAssignmentSequence;
@@ -61,6 +62,7 @@ public class MockSplitEnumeratorContext<SplitT extends SourceSplit>
     private final BlockingQueue<Callable<Future<?>>> oneTimeCallables;
     private final List<Callable<Future<?>>> periodicCallables;
     private final AtomicBoolean stoppedAcceptAsyncCalls;
+    private final boolean[] subtaskHasNoMoreSplits;
 
     private final int parallelism;
 
@@ -78,6 +80,7 @@ public class MockSplitEnumeratorContext<SplitT extends SourceSplit>
                 getExecutor(getThreadFactory("SplitEnumerator-worker", errorInWorkerThread));
         this.mainExecutor = getExecutor(mainThreadFactory);
         this.stoppedAcceptAsyncCalls = new AtomicBoolean(false);
+        this.subtaskHasNoMoreSplits = new boolean[parallelism];
     }
 
     @Override
@@ -120,7 +123,16 @@ public class MockSplitEnumeratorContext<SplitT extends SourceSplit>
     }
 
     @Override
-    public void signalNoMoreSplits(int subtask) {}
+    public void signalNoMoreSplits(int subtask) {
+        subtaskHasNoMoreSplits[subtask] = true;
+    }
+
+    @Override
+    public void signalIntermediateNoMoreSplits(int subtask) {}
+
+    public void resetNoMoreSplits(int subtask) {
+        subtaskHasNoMoreSplits[subtask] = false;
+    }
 
     @Override
     public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) {
@@ -229,6 +241,10 @@ public class MockSplitEnumeratorContext<SplitT extends SourceSplit>
         return splitsAssignmentSequence;
     }
 
+    public boolean hasNoMoreSplits(int subtaskIndex) {
+        return subtaskHasNoMoreSplits[subtaskIndex];
+    }
+
     // ------------- private helpers -------------
 
     private void checkError() throws Throwable {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index cca65355a2c..59ab6d8c2d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
@@ -87,7 +88,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 @Internal
 public class SourceCoordinatorContext<SplitT extends SourceSplit>
-        implements SplitEnumeratorContext<SplitT>, AutoCloseable {
+        implements SplitEnumeratorContext<SplitT>, SupportsIntermediateNoMoreSplits, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinatorContext.class);
 
@@ -289,6 +290,19 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
                 "Failed to send 'NoMoreSplits' to reader " + subtask);
     }
 
+    @Override
+    public void signalIntermediateNoMoreSplits(int subtask) {
+        checkSubtaskIndex(subtask);
+
+        // It's an intermediate noMoreSplit event, notify subtask to deal with this event.
+        callInCoordinatorThread(
+                () -> {
+                    signalNoMoreSplitsToAttempts(subtask);
+                    return null;
+                },
+                "Failed to send 'IntermediateNoMoreSplits' to reader " + subtask);
+    }
+
     @Override
     public <T> void callAsync(
             Callable<T> callable,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index a466c0d0175..3f15e88bd2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -223,6 +223,26 @@ class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
         assertThat(operatorCoordinatorContext.isJobFailed()).isFalse();
     }
 
+    @Test
+    void testSupportsIntermediateNoMoreSplits() throws Exception {
+        sourceReady();
+        registerReaders();
+
+        SplitsAssignment<MockSourceSplit> splitsAssignment = getSplitsAssignment(2, 0);
+        context.assignSplits(splitsAssignment);
+        context.signalIntermediateNoMoreSplits(0);
+        context.signalIntermediateNoMoreSplits(1);
+        assertThat(context.hasNoMoreSplits(0)).isFalse();
+        assertThat(context.hasNoMoreSplits(1)).isFalse();
+        assertThat(context.hasNoMoreSplits(2)).isFalse();
+
+        context.signalNoMoreSplits(0);
+        context.signalNoMoreSplits(1);
+        assertThat(context.hasNoMoreSplits(0)).isTrue();
+        assertThat(context.hasNoMoreSplits(1)).isTrue();
+        assertThat(context.hasNoMoreSplits(2)).isFalse();
+    }
+
     // ------------------------
 
     private List<ReaderInfo> registerReaders() {