You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/01/26 07:30:26 UTC

[flink] branch master updated (88cb0853b7b -> 560f3147038)

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 88cb0853b7b [hotfix][javadocs]Fix javadoc in Catalog Interface
     new 2d1510a9d55 [FLINK-30623][runtime][refactor] Using the CanEmitBatchOfRecordsChecker instead of Supplier<Boolean>
     new 877511b8ea2 [FLINK-30623][runtime][refactor] Using the canEmitBatchOfRecords instead of isEmitNextLoopDisabled in the SourceOperator
     new 560f3147038 [FLINK-30623][runtime][bug] The canEmitBatchOfRecords should check recordWriter and changelogWriterAvailabilityProvider are available

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...sultPartitionWriterWithAvailabilityHelper.java} | 18 +++------
 .../streaming/api/operators/SourceOperator.java    | 19 +++-------
 .../api/operators/SourceOperatorFactory.java       |  9 ++---
 .../runtime/io/StreamTaskSourceInput.java          |  5 ---
 .../runtime/tasks/MultipleInputStreamTask.java     | 17 +++++----
 .../flink/streaming/runtime/tasks/StreamTask.java  | 22 ++++++++---
 .../runtime/tasks/TwoInputStreamTask.java          | 18 ++++-----
 .../operators/source/TestingSourceOperator.java    |  2 +-
 .../runtime/tasks/MultipleInputStreamTaskTest.java | 44 ++++++++++++++++++++++
 .../runtime/tasks/OneInputStreamTaskTest.java      | 42 +++++++++++++++++++++
 .../runtime/tasks/TwoInputStreamTaskTest.java      | 44 ++++++++++++++++++++++
 11 files changed, 180 insertions(+), 60 deletions(-)
 copy flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/{AvailabilityTestResultPartitionWriter.java => ResultPartitionWriterWithAvailabilityHelper.java} (67%)


[flink] 02/03: [FLINK-30623][runtime][refactor] Using the canEmitBatchOfRecords instead of isEmitNextLoopDisabled in the SourceOperator

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 877511b8ea2b4ef62fc520a0d0bd9087f2f25c56
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Tue Jan 17 15:50:14 2023 +0800

    [FLINK-30623][runtime][refactor] Using the canEmitBatchOfRecords instead of isEmitNextLoopDisabled in the SourceOperator
    
    This closes #21690.
---
 .../flink/streaming/api/operators/SourceOperator.java  |  9 ---------
 .../streaming/runtime/io/StreamTaskSourceInput.java    |  5 -----
 .../runtime/tasks/MultipleInputStreamTask.java         | 17 +++++++++--------
 .../streaming/runtime/tasks/TwoInputStreamTask.java    | 18 +++++++++---------
 4 files changed, 18 insertions(+), 31 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 91e6bd3d714..f54b273faaf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -170,7 +170,6 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
     private int numSplits;
     private final Map<String, Long> splitCurrentWatermarks = new HashMap<>();
     private final Set<String> currentlyPausedSplits = new HashSet<>();
-    private boolean isEmitNextLoopDisabled = false;
 
     private enum OperatingMode {
         READING,
@@ -412,9 +411,6 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
         if (operatingMode != OperatingMode.READING) {
             return emitNextNotReading(output);
         }
-        if (isEmitNextLoopDisabled) {
-            return convertToInternalStatus(sourceReader.pollNext(currentMainOutput));
-        }
 
         InputStatus status;
         do {
@@ -572,11 +568,6 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
         }
     }
 
-    // Configure SourceOperator#emitNext to emit at most one record to the given DataOutput.
-    public void disableEmitNextLoop() {
-        isEmitNextLoopDisabled = true;
-    }
-
     private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {
         try {
             List<SplitT> newSplits = event.splits(splitSerializer);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
index 55863fee47e..54988606b15 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
@@ -157,9 +157,4 @@ public class StreamTaskSourceInput<T> implements StreamTaskInput<T>, Checkpointa
     public SourceOperator<T, ?> getOperator() {
         return operator;
     }
-
-    // Configure StreamTaskSourceInput#emitNext to emit at most one record to the given DataOutput.
-    public void disableEmitNextLoop() {
-        operator.disableEmitNextLoop();
-    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
index 6f1ed44e8a8..b86c0b9d847 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
@@ -82,14 +82,6 @@ public class MultipleInputStreamTask<OUT>
         StreamConfig configuration = getConfiguration();
         ClassLoader userClassLoader = getUserCodeClassLoader();
 
-        // This is needed for StreamMultipleInputProcessor#processInput to preserve the existing
-        // behavior of choosing an input every time a record is emitted. This behavior is good for
-        // fairness between input consumption. But it can reduce throughput due to added control
-        // flow cost on the per-record code path.
-        for (StreamTaskSourceInput<?> input : operatorChain.getSourceTaskInputs()) {
-            input.disableEmitNextLoop();
-        }
-
         InputConfig[] inputs = configuration.getInputs(userClassLoader);
 
         WatermarkGauge[] watermarkGauges = new WatermarkGauge[inputs.length];
@@ -213,6 +205,15 @@ public class MultipleInputStreamTask<OUT>
         }
     }
 
+    // This is needed for StreamMultipleInputProcessor#processInput to preserve the existing
+    // behavior of choosing an input every time a record is emitted. This behavior is good for
+    // fairness between input consumption. But it can reduce throughput due to added control
+    // flow cost on the per-record code path.
+    @Override
+    public CanEmitBatchOfRecordsChecker getCanEmitBatchOfRecords() {
+        return () -> false;
+    }
+
     private boolean isSynchronous(SnapshotType snapshotType) {
         return snapshotType.isSavepoint() && ((SavepointType) snapshotType).isSynchronous();
     }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index e62ed44a99c..4658c454b12 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory;
 import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
 import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
@@ -62,14 +61,6 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTas
             List<IndexedInputGate> inputGates2,
             Function<Integer, StreamPartitioner<?>> gatePartitioners) {
 
-        // This is needed for StreamMultipleInputProcessor#processInput to preserve the existing
-        // behavior of choosing an input every time a record is emitted. This behavior is good for
-        // fairness between input consumption. But it can reduce throughput due to added control
-        // flow cost on the per-record code path.
-        for (StreamTaskSourceInput<?> input : operatorChain.getSourceTaskInputs()) {
-            input.disableEmitNextLoop();
-        }
-
         // create an input instance for each input
         checkpointBarrierHandler =
                 InputProcessorUtil.createCheckpointBarrierHandler(
@@ -113,4 +104,13 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTas
                         gatePartitioners,
                         getEnvironment().getTaskInfo());
     }
+
+    // This is needed for StreamMultipleInputProcessor#processInput to preserve the existing
+    // behavior of choosing an input every time a record is emitted. This behavior is good for
+    // fairness between input consumption. But it can reduce throughput due to added control
+    // flow cost on the per-record code path.
+    @Override
+    public CanEmitBatchOfRecordsChecker getCanEmitBatchOfRecords() {
+        return () -> false;
+    }
 }


[flink] 03/03: [FLINK-30623][runtime][bug] The canEmitBatchOfRecords should check recordWriter and changelogWriterAvailabilityProvider are available

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 560f314703858464f5089e24e065347d00704af5
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Wed Jan 25 16:24:46 2023 +0800

    [FLINK-30623][runtime][bug] The canEmitBatchOfRecords should check recordWriter and changelogWriterAvailabilityProvider are available
    
    This closes #21690.
---
 ...esultPartitionWriterWithAvailabilityHelper.java | 46 ++++++++++++++++++++++
 .../flink/streaming/runtime/tasks/StreamTask.java  | 12 ++++--
 .../runtime/tasks/MultipleInputStreamTaskTest.java | 44 +++++++++++++++++++++
 .../runtime/tasks/OneInputStreamTaskTest.java      | 42 ++++++++++++++++++++
 .../runtime/tasks/TwoInputStreamTaskTest.java      | 44 +++++++++++++++++++++
 5 files changed, 184 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterWithAvailabilityHelper.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterWithAvailabilityHelper.java
new file mode 100644
index 00000000000..8bc5f7321ed
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterWithAvailabilityHelper.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A specific result partition writer implementation only used to control the output availability
+ * state in tests.
+ */
+public class ResultPartitionWriterWithAvailabilityHelper extends MockResultPartitionWriter {
+
+    private final AvailabilityHelper availabilityHelper;
+
+    public ResultPartitionWriterWithAvailabilityHelper(AvailabilityHelper availabilityHelper) {
+        this.availabilityHelper = availabilityHelper;
+    }
+
+    @Override
+    public CompletableFuture<?> getAvailableFuture() {
+        return availabilityHelper.getAvailableFuture();
+    }
+
+    @Override
+    public boolean isAvailable() {
+        return availabilityHelper.isAvailable();
+    }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 63f21f86f4b..6b52ed5da3b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -547,9 +547,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
         DataInputStatus status = inputProcessor.processInput();
         switch (status) {
             case MORE_AVAILABLE:
-                if (recordWriter.isAvailable()
-                        && (changelogWriterAvailabilityProvider == null
-                                || changelogWriterAvailabilityProvider.isAvailable())) {
+                if (taskIsAvailable()) {
                     return;
                 }
                 break;
@@ -994,8 +992,14 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
         return this.mailboxProcessor::getMailboxExecutor;
     }
 
+    private boolean taskIsAvailable() {
+        return recordWriter.isAvailable()
+                && (changelogWriterAvailabilityProvider == null
+                        || changelogWriterAvailabilityProvider.isAvailable());
+    }
+
     public CanEmitBatchOfRecordsChecker getCanEmitBatchOfRecords() {
-        return () -> !this.mailboxProcessor.hasMail();
+        return () -> !this.mailboxProcessor.hasMail() && taskIsAvailable();
     }
 
     public final boolean isRunning() {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 1a015d4fb84..8c46fd5fb92 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -50,12 +50,14 @@ import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.SavepointType;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.AvailabilityProvider;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfData;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.StopMode;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriterWithAvailabilityHelper;
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -130,6 +132,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -892,6 +895,47 @@ public class MultipleInputStreamTaskTest {
         }
     }
 
+    /** The CanEmitBatchOfRecords should always be false for {@link MultipleInputStreamTask}. */
+    @Test
+    public void testCanEmitBatchOfRecords() throws Exception {
+        AvailabilityProvider.AvailabilityHelper availabilityHelper =
+                new AvailabilityProvider.AvailabilityHelper();
+        try (StreamTaskMailboxTestHarness<String> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+                        .addInput(BasicTypeInfo.STRING_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
+                        .addAdditionalOutput(
+                                new ResultPartitionWriterWithAvailabilityHelper(availabilityHelper))
+                        .setupOperatorChain(new MapToStringMultipleInputOperatorFactory(3))
+                        .finishForSingletonOperatorChain(IntSerializer.INSTANCE)
+                        .build()) {
+            StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecordsChecker =
+                    testHarness.streamTask.getCanEmitBatchOfRecords();
+            testHarness.processAll();
+
+            availabilityHelper.resetAvailable();
+            assertFalse(canEmitBatchOfRecordsChecker.check());
+
+            // The canEmitBatchOfRecordsChecker should be the false after the record writer is
+            // unavailable.
+            availabilityHelper.resetUnavailable();
+            assertFalse(canEmitBatchOfRecordsChecker.check());
+
+            // Restore record writer to available
+            availabilityHelper.resetAvailable();
+            assertFalse(canEmitBatchOfRecordsChecker.check());
+
+            // The canEmitBatchOfRecordsChecker should be the false after add the mail to mail box.
+            testHarness.streamTask.mainMailboxExecutor.execute(() -> {}, "mail");
+            assertFalse(canEmitBatchOfRecordsChecker.check());
+
+            testHarness.processAll();
+            assertFalse(canEmitBatchOfRecordsChecker.check());
+        }
+    }
+
     @Test
     public void testLatencyMarker() throws Exception {
         final Map<String, Metric> metrics = new ConcurrentHashMap<>();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index bc7bb15ae21..f8e60522b31 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -37,8 +37,10 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.AvailabilityProvider;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriterWithAvailabilityHelper;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
@@ -61,6 +63,7 @@ import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Preconditions;
@@ -87,6 +90,7 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.cr
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -954,6 +958,44 @@ public class OneInputStreamTaskTest extends TestLogger {
         testHarness.waitForTaskCompletion();
     }
 
+    @Test
+    public void testCanEmitBatchOfRecords() throws Exception {
+        AvailabilityProvider.AvailabilityHelper availabilityHelper =
+                new AvailabilityProvider.AvailabilityHelper();
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(
+                                new ResultPartitionWriterWithAvailabilityHelper(availabilityHelper))
+                        .setupOperatorChain(new TestOperator())
+                        .finishForSingletonOperatorChain(IntSerializer.INSTANCE)
+                        .build()) {
+            CanEmitBatchOfRecordsChecker canEmitBatchOfRecordsChecker =
+                    testHarness.streamTask.getCanEmitBatchOfRecords();
+            testHarness.processAll();
+
+            availabilityHelper.resetAvailable();
+            assertTrue(canEmitBatchOfRecordsChecker.check());
+
+            // The canEmitBatchOfRecordsChecker should be the false after the record writer is
+            // unavailable.
+            availabilityHelper.resetUnavailable();
+            assertFalse(canEmitBatchOfRecordsChecker.check());
+
+            // Restore record writer to available
+            availabilityHelper.resetAvailable();
+            assertTrue(canEmitBatchOfRecordsChecker.check());
+
+            // The canEmitBatchOfRecordsChecker should be the false after add the mail to mail box.
+            testHarness.streamTask.mainMailboxExecutor.execute(() -> {}, "mail");
+            assertFalse(canEmitBatchOfRecordsChecker.check());
+
+            testHarness.processAll();
+            assertTrue(canEmitBatchOfRecordsChecker.check());
+        }
+    }
+
     static class WatermarkMetricOperator extends AbstractStreamOperator<String>
             implements OneInputStreamOperator<String, String> {
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index d98827d3137..9d215a4b1a6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
@@ -29,10 +30,12 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.AvailabilityProvider;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfData;
 import org.apache.flink.runtime.io.network.api.StopMode;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriterWithAvailabilityHelper;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
@@ -75,6 +78,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 /**
  * Tests for {@link TwoInputStreamTask}.
@@ -781,6 +785,46 @@ public class TwoInputStreamTaskTest {
         testHarness.waitForTaskCompletion();
     }
 
+    /** The CanEmitBatchOfRecords should always be false for {@link TwoInputStreamTask}. */
+    @Test
+    public void testCanEmitBatchOfRecords() throws Exception {
+        AvailabilityProvider.AvailabilityHelper availabilityHelper =
+                new AvailabilityProvider.AvailabilityHelper();
+        try (StreamTaskMailboxTestHarness<String> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+                        .addInput(BasicTypeInfo.STRING_TYPE_INFO)
+                        .addInput(BasicTypeInfo.STRING_TYPE_INFO)
+                        .addAdditionalOutput(
+                                new ResultPartitionWriterWithAvailabilityHelper(availabilityHelper))
+                        .setupOperatorChain(new DuplicatingOperator())
+                        .finishForSingletonOperatorChain(IntSerializer.INSTANCE)
+                        .build()) {
+            StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecordsChecker =
+                    testHarness.streamTask.getCanEmitBatchOfRecords();
+            testHarness.processAll();
+
+            availabilityHelper.resetAvailable();
+            assertFalse(canEmitBatchOfRecordsChecker.check());
+
+            // The canEmitBatchOfRecordsChecker should be the false after the record writer is
+            // unavailable.
+            availabilityHelper.resetUnavailable();
+            assertFalse(canEmitBatchOfRecordsChecker.check());
+
+            // Restore record writer to available
+            availabilityHelper.resetAvailable();
+            assertFalse(canEmitBatchOfRecordsChecker.check());
+
+            // The canEmitBatchOfRecordsChecker should be the false after add the mail to mail box.
+            testHarness.streamTask.mainMailboxExecutor.execute(() -> {}, "mail");
+            assertFalse(canEmitBatchOfRecordsChecker.check());
+
+            testHarness.processAll();
+            assertFalse(canEmitBatchOfRecordsChecker.check());
+        }
+    }
+
     // This must only be used in one test, otherwise the static fields will be changed
     // by several tests concurrently
     private static class TestOpenCloseMapFunction


[flink] 01/03: [FLINK-30623][runtime][refactor] Using the CanEmitBatchOfRecordsChecker instead of Supplier

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2d1510a9d559a49806a60ececfd854dd53a6591d
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Wed Jan 25 14:26:17 2023 +0800

    [FLINK-30623][runtime][refactor] Using the CanEmitBatchOfRecordsChecker instead of Supplier<Boolean>
    
    This closes #21690.
---
 .../apache/flink/streaming/api/operators/SourceOperator.java | 10 +++++-----
 .../flink/streaming/api/operators/SourceOperatorFactory.java |  9 ++++-----
 .../org/apache/flink/streaming/runtime/tasks/StreamTask.java | 12 +++++++++---
 .../api/operators/source/TestingSourceOperator.java          |  2 +-
 4 files changed, 19 insertions(+), 14 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index fb5da2e725c..91e6bd3d714 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -59,6 +59,7 @@ import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.UserCodeClassLoader;
@@ -76,7 +77,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
 
 import static org.apache.flink.configuration.PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -192,7 +192,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
 
     private final boolean allowUnalignedSourceSplits;
 
-    private Supplier<Boolean> mailboxHasMail;
+    private final CanEmitBatchOfRecordsChecker canEmitBatchOfRecords;
 
     public SourceOperator(
             FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception>
@@ -204,7 +204,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
             Configuration configuration,
             String localHostname,
             boolean emitProgressiveWatermarks,
-            Supplier<Boolean> mailboxHasMail) {
+            CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
 
         this.readerFactory = checkNotNull(readerFactory);
         this.operatorEventGateway = checkNotNull(operatorEventGateway);
@@ -217,7 +217,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
         this.operatingMode = OperatingMode.OUTPUT_NOT_INITIALIZED;
         this.watermarkAlignmentParams = watermarkStrategy.getAlignmentParameters();
         this.allowUnalignedSourceSplits = configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS);
-        this.mailboxHasMail = checkNotNull(mailboxHasMail);
+        this.canEmitBatchOfRecords = checkNotNull(canEmitBatchOfRecords);
     }
 
     @Override
@@ -420,7 +420,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
         do {
             status = sourceReader.pollNext(currentMainOutput);
         } while (status == InputStatus.MORE_AVAILABLE
-                && !mailboxHasMail.get()
+                && canEmitBatchOfRecords.check()
                 && !shouldWaitForAlignment());
         return convertToInternalStatus(status);
     }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index 4846a1ebbc9..479c5224365 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -32,12 +32,11 @@ import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
+import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
 import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.Nullable;
 
-import java.util.function.Supplier;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** The Factory class for {@link SourceOperator}. */
@@ -116,7 +115,7 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
                                 .getTaskManagerInfo()
                                 .getTaskManagerExternalAddress(),
                         emitProgressiveWatermarks,
-                        parameters.getContainingTask().getMailboxHasMail());
+                        parameters.getContainingTask().getCanEmitBatchOfRecords());
 
         sourceOperator.setup(
                 parameters.getContainingTask(),
@@ -172,7 +171,7 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
                     Configuration config,
                     String localHostName,
                     boolean emitProgressiveWatermarks,
-                    Supplier<Boolean> mailboxHasMail) {
+                    CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
 
         // jumping through generics hoops: cast the generics away to then cast them back more
         // strictly typed
@@ -194,6 +193,6 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
                 config,
                 localHostName,
                 emitProgressiveWatermarks,
-                mailboxHasMail);
+                canEmitBatchOfRecords);
     }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index ee10e7929e3..63f21f86f4b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -137,7 +137,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
-import java.util.function.Supplier;
 
 import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_PERIOD;
 import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
@@ -995,8 +994,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
         return this.mailboxProcessor::getMailboxExecutor;
     }
 
-    public Supplier<Boolean> getMailboxHasMail() {
-        return this.mailboxProcessor::hasMail;
+    public CanEmitBatchOfRecordsChecker getCanEmitBatchOfRecords() {
+        return () -> !this.mailboxProcessor.hasMail();
     }
 
     public final boolean isRunning() {
@@ -1769,4 +1768,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
     public final Environment getEnvironment() {
         return environment;
     }
+
+    /** Check whether records can be emitted in batch. */
+    @FunctionalInterface
+    public interface CanEmitBatchOfRecordsChecker {
+
+        boolean check();
+    }
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index 8aca0222eae..1655ec7cdd7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -90,7 +90,7 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit>
                 new Configuration(),
                 "localhost",
                 emitProgressiveWatermarks,
-                () -> true);
+                () -> false);
 
         this.subtaskIndex = subtaskIndex;
         this.parallelism = parallelism;