You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2023/12/19 11:00:48 UTC

(flink) 02/09: [FLINK-33810][runtime] SourceCoordinator notify SourceOperator when backlog is changed

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2bf39a26bc575f8b50c338486f0f52a9aa7ddee3
Author: sxnan <su...@gmail.com>
AuthorDate: Thu Nov 16 15:14:17 2023 +0800

    [FLINK-33810][runtime] SourceCoordinator notify SourceOperator when backlog is changed
---
 .../source/coordinator/SourceCoordinator.java      |  7 +++
 .../coordinator/SourceCoordinatorContext.java      | 22 ++++++++
 .../source/event/IsProcessingBacklogEvent.java     | 58 ++++++++++++++++++++++
 .../coordination/EventReceivingTasks.java          | 11 ++--
 .../MockOperatorCoordinatorContext.java            |  2 +-
 .../coordinator/SourceCoordinatorContextTest.java  | 23 +++++++++
 6 files changed, 118 insertions(+), 5 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index faeac9a8dc4..b69f8172ab1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
 import org.apache.flink.runtime.source.event.RequestSplitEvent;
@@ -607,6 +608,12 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
         context.registerSourceReader(subtask, attemptNumber, event.location());
         if (!subtaskReaderExisted) {
             enumerator.addReader(event.subtaskId());
+
+            final Boolean isBacklog = context.isBacklog().getAsBoolean();
+            if (isBacklog != null) {
+                context.sendEventToSourceOperatorIfTaskReady(
+                        subtask, new IsProcessingBacklogEvent(isBacklog));
+            }
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 492cdf19e28..b42223d040d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -35,10 +35,12 @@ import org.apache.flink.runtime.metrics.groups.InternalSplitEnumeratorMetricGrou
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent;
 import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.TernaryBoolean;
 import org.apache.flink.util.ThrowableCatchingRunnable;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
@@ -112,6 +114,7 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
     private final boolean supportsConcurrentExecutionAttempts;
     private final boolean[] subtaskHasNoMoreSplits;
     private volatile boolean closed;
+    private volatile TernaryBoolean backlog = TernaryBoolean.UNDEFINED;
 
     public SourceCoordinatorContext(
             SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
@@ -370,6 +373,17 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
         if (checkpointCoordinator != null) {
             checkpointCoordinator.setIsProcessingBacklog(operatorID, isProcessingBacklog);
         }
+        backlog = TernaryBoolean.fromBoolean(isProcessingBacklog);
+        callInCoordinatorThread(
+                () -> {
+                    final IsProcessingBacklogEvent isProcessingBacklogEvent =
+                            new IsProcessingBacklogEvent(isProcessingBacklog);
+                    for (int i = 0; i < getCoordinatorContext().currentParallelism(); i++) {
+                        sendEventToSourceOperatorIfTaskReady(i, isProcessingBacklogEvent);
+                    }
+                    return null;
+                },
+                "Failed to send BacklogEvent to reader.");
     }
 
     // --------- Package private additional methods for the SourceCoordinator ------------
@@ -629,6 +643,14 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
         }
     }
 
+    /**
+     * Returns whether the Source is processing backlog data. UNDEFINED is returned if it is not set
+     * by the {@link #setIsProcessingBacklog} method.
+     */
+    public TernaryBoolean isBacklog() {
+        return backlog;
+    }
+
     /** Maintains the subtask gateways for different execution attempts of different subtasks. */
     private static class SubtaskGateways {
         private final Map<Integer, OperatorCoordinator.SubtaskGateway>[] gateways;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/IsProcessingBacklogEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/IsProcessingBacklogEvent.java
new file mode 100644
index 00000000000..cf74258e1bd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/IsProcessingBacklogEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.event;
+
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+import java.util.Objects;
+
+/** A source event that notify the source operator of the backlog status. */
+public class IsProcessingBacklogEvent implements OperatorEvent {
+    private static final long serialVersionUID = 1L;
+    private final boolean isProcessingBacklog;
+
+    public IsProcessingBacklogEvent(boolean isProcessingBacklog) {
+        this.isProcessingBacklog = isProcessingBacklog;
+    }
+
+    public boolean isProcessingBacklog() {
+        return isProcessingBacklog;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final IsProcessingBacklogEvent that = (IsProcessingBacklogEvent) o;
+        return Objects.equals(isProcessingBacklog, that.isProcessingBacklog);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(isProcessingBacklog);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("BacklogEvent (backlog='%s')", isProcessingBacklog);
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
index 3d3df195856..15659cb5c6e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
@@ -96,10 +96,13 @@ public class EventReceivingTasks implements SubtaskAccess.SubtaskAccessFactory {
     }
 
     public List<OperatorEvent> getSentEventsForSubtask(int subtaskIndex) {
-        return events.stream()
-                .filter((evt) -> evt.subtask == subtaskIndex)
-                .map((evt) -> evt.event)
-                .collect(Collectors.toList());
+
+        // Create a new array list to avoid concurrent modification during processing the events
+        return new ArrayList<>(events)
+                .stream()
+                        .filter((evt) -> evt.subtask == subtaskIndex)
+                        .map((evt) -> evt.event)
+                        .collect(Collectors.toList());
     }
 
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
index 1355d1186a2..f7f368eb863 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
@@ -95,7 +95,7 @@ public class MockOperatorCoordinatorContext implements OperatorCoordinator.Conte
 
     @Override
     public CheckpointCoordinator getCheckpointCoordinator() {
-        throw new UnsupportedOperationException();
+        return null;
     }
 
     // -------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index 473330d5bc8..0d4c1a02d4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 
 import org.junit.jupiter.api.Test;
@@ -261,4 +262,26 @@ class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
 
         return infos;
     }
+
+    @Test
+    void testSetIsProcessingBacklog() throws Exception {
+        sourceReady();
+        registerReader(0, 0);
+        context.setIsProcessingBacklog(true);
+
+        for (int i = 0; i < context.currentParallelism(); ++i) {
+            final List<OperatorEvent> events = receivingTasks.getSentEventsForSubtask(i);
+            assertThat(events.get(events.size() - 1)).isEqualTo(new IsProcessingBacklogEvent(true));
+        }
+
+        registerReader(1, 0);
+        context.setIsProcessingBacklog(false);
+        registerReader(2, 0);
+
+        for (int i = 0; i < context.currentParallelism(); ++i) {
+            final List<OperatorEvent> events = receivingTasks.getSentEventsForSubtask(i);
+            assertThat(events.get(events.size() - 1))
+                    .isEqualTo(new IsProcessingBacklogEvent(false));
+        }
+    }
 }