You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/01/27 01:35:19 UTC

[flink] 02/02: [FLINK-30709][runtime] NetworkInput#emitNext() should push records to DataOutput within a while loop

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6c7c30118ef26f98a7d422831fa8047f1fd9f98
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Thu Jan 26 11:56:02 2023 +0800

    [FLINK-30709][runtime] NetworkInput#emitNext() should push records to DataOutput within a while loop
    
    This closes #21697.
---
 .../runtime/io/AbstractStreamTaskNetworkInput.java | 16 +++-
 .../io/StreamMultipleInputProcessorFactory.java    |  7 +-
 .../runtime/io/StreamTaskNetworkInput.java         |  7 +-
 .../runtime/io/StreamTaskNetworkInputFactory.java  | 10 ++-
 .../runtime/io/StreamTwoInputProcessorFactory.java | 10 ++-
 .../recovery/RescalingStreamTaskNetworkInput.java  | 10 ++-
 .../runtime/tasks/MultipleInputStreamTask.java     |  3 +-
 .../runtime/tasks/OneInputStreamTask.java          |  3 +-
 .../runtime/tasks/TwoInputStreamTask.java          |  3 +-
 .../runtime/io/StreamTaskNetworkInputTest.java     | 97 +++++++++++++++++++++-
 10 files changed, 145 insertions(+), 21 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
index 6c59c9922bf..db5198bd716 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
 import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
 
 import java.io.IOException;
@@ -62,12 +63,15 @@ public abstract class AbstractStreamTaskNetworkInput<
     private InputChannelInfo lastChannel = null;
     private R currentRecordDeserializer = null;
 
+    protected final CanEmitBatchOfRecordsChecker canEmitBatchOfRecords;
+
     public AbstractStreamTaskNetworkInput(
             CheckpointedInputGate checkpointedInputGate,
             TypeSerializer<T> inputSerializer,
             StatusWatermarkValve statusWatermarkValve,
             int inputIndex,
-            Map<InputChannelInfo, R> recordDeserializers) {
+            Map<InputChannelInfo, R> recordDeserializers,
+            CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
         super();
         this.checkpointedInputGate = checkpointedInputGate;
         deserializationDelegate =
@@ -82,6 +86,7 @@ public abstract class AbstractStreamTaskNetworkInput<
         this.statusWatermarkValve = checkNotNull(statusWatermarkValve);
         this.inputIndex = inputIndex;
         this.recordDeserializers = checkNotNull(recordDeserializers);
+        this.canEmitBatchOfRecords = checkNotNull(canEmitBatchOfRecords);
     }
 
     @Override
@@ -103,6 +108,9 @@ public abstract class AbstractStreamTaskNetworkInput<
 
                 if (result.isFullRecord()) {
                     processElement(deserializationDelegate.getInstance(), output);
+                    if (canEmitBatchOfRecords.check()) {
+                        continue;
+                    }
                     return DataInputStatus.MORE_AVAILABLE;
                 }
             }
@@ -115,7 +123,11 @@ public abstract class AbstractStreamTaskNetworkInput<
                 if (bufferOrEvent.get().isBuffer()) {
                     processBuffer(bufferOrEvent.get());
                 } else {
-                    return processEvent(bufferOrEvent.get());
+                    DataInputStatus status = processEvent(bufferOrEvent.get());
+                    if (status == DataInputStatus.MORE_AVAILABLE && canEmitBatchOfRecords.check()) {
+                        continue;
+                    }
+                    return status;
                 }
             } else {
                 if (checkpointedInputGate.isFinished()) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
index f06e9e71fe8..f318e796abe 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
@@ -47,6 +47,7 @@ import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorChain;
 import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
 import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
 import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
@@ -84,7 +85,8 @@ public class StreamMultipleInputProcessorFactory {
             OperatorChain<?, ?> operatorChain,
             InflightDataRescalingDescriptor inflightDataRescalingDescriptor,
             Function<Integer, StreamPartitioner<?>> gatePartitioners,
-            TaskInfo taskInfo) {
+            TaskInfo taskInfo,
+            CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
         checkNotNull(operatorChain);
 
         List<Input> operatorInputs = mainOperator.getInputs();
@@ -116,7 +118,8 @@ public class StreamMultipleInputProcessorFactory {
                                 i,
                                 inflightDataRescalingDescriptor,
                                 gatePartitioners,
-                                taskInfo);
+                                taskInfo,
+                                canEmitBatchOfRecords);
             } else if (configuredInput instanceof StreamConfig.SourceInputConfig) {
                 StreamConfig.SourceInputConfig sourceInput =
                         (StreamConfig.SourceInputConfig) configuredInput;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
index 793252f6b67..c4491b11559 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
 import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
@@ -65,13 +66,15 @@ public final class StreamTaskNetworkInput<T>
             TypeSerializer<T> inputSerializer,
             IOManager ioManager,
             StatusWatermarkValve statusWatermarkValve,
-            int inputIndex) {
+            int inputIndex,
+            CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
         super(
                 checkpointedInputGate,
                 inputSerializer,
                 statusWatermarkValve,
                 inputIndex,
-                getRecordDeserializers(checkpointedInputGate, ioManager));
+                getRecordDeserializers(checkpointedInputGate, ioManager),
+                canEmitBatchOfRecords);
     }
 
     // Initialize one deserializer per input channel
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputFactory.java
index a9b46717d22..5895717d6ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
 import org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
 import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
 
 import java.util.function.Function;
@@ -42,7 +43,8 @@ public class StreamTaskNetworkInputFactory {
             int inputIndex,
             InflightDataRescalingDescriptor rescalingDescriptorinflightDataRescalingDescriptor,
             Function<Integer, StreamPartitioner<?>> gatePartitioners,
-            TaskInfo taskInfo) {
+            TaskInfo taskInfo,
+            CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
         return rescalingDescriptorinflightDataRescalingDescriptor.equals(
                         InflightDataRescalingDescriptor.NO_RESCALE)
                 ? new StreamTaskNetworkInput<>(
@@ -50,7 +52,8 @@ public class StreamTaskNetworkInputFactory {
                         inputSerializer,
                         ioManager,
                         statusWatermarkValve,
-                        inputIndex)
+                        inputIndex,
+                        canEmitBatchOfRecords)
                 : new RescalingStreamTaskNetworkInput<>(
                         checkpointedInputGate,
                         inputSerializer,
@@ -59,6 +62,7 @@ public class StreamTaskNetworkInputFactory {
                         inputIndex,
                         rescalingDescriptorinflightDataRescalingDescriptor,
                         gatePartitioners,
-                        taskInfo);
+                        taskInfo,
+                        canEmitBatchOfRecords);
     }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
index ea296a3283c..3b2dc87a911 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
@@ -42,6 +42,7 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
 import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.function.ThrowingConsumer;
@@ -75,7 +76,8 @@ public class StreamTwoInputProcessorFactory {
             Counter numRecordsIn,
             InflightDataRescalingDescriptor inflightDataRescalingDescriptor,
             Function<Integer, StreamPartitioner<?>> gatePartitioners,
-            TaskInfo taskInfo) {
+            TaskInfo taskInfo,
+            CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
 
         checkNotNull(operatorChain);
 
@@ -91,7 +93,8 @@ public class StreamTwoInputProcessorFactory {
                         0,
                         inflightDataRescalingDescriptor,
                         gatePartitioners,
-                        taskInfo);
+                        taskInfo,
+                        canEmitBatchOfRecords);
         TypeSerializer<IN2> typeSerializer2 = streamConfig.getTypeSerializerIn(1, userClassloader);
         StreamTaskInput<IN2> input2 =
                 StreamTaskNetworkInputFactory.create(
@@ -103,7 +106,8 @@ public class StreamTwoInputProcessorFactory {
                         1,
                         inflightDataRescalingDescriptor,
                         gatePartitioners,
-                        taskInfo);
+                        taskInfo,
+                        canEmitBatchOfRecords);
 
         InputSelectable inputSelectable =
                 streamOperator instanceof InputSelectable ? (InputSelectable) streamOperator : null;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java
index 8c359a016ae..b95dded940e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java
@@ -41,6 +41,7 @@ import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitio
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
 import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
@@ -94,7 +95,8 @@ public final class RescalingStreamTaskNetworkInput<T>
             int inputIndex,
             InflightDataRescalingDescriptor inflightDataRescalingDescriptor,
             Function<Integer, StreamPartitioner<?>> gatePartitioners,
-            TaskInfo taskInfo) {
+            TaskInfo taskInfo,
+            CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
         super(
                 checkpointedInputGate,
                 inputSerializer,
@@ -106,7 +108,8 @@ public final class RescalingStreamTaskNetworkInput<T>
                         ioManager,
                         inflightDataRescalingDescriptor,
                         gatePartitioners,
-                        taskInfo));
+                        taskInfo),
+                canEmitBatchOfRecords);
         this.ioManager = ioManager;
 
         LOG.info(
@@ -159,7 +162,8 @@ public final class RescalingStreamTaskNetworkInput<T>
                 inputSerializer,
                 ioManager,
                 statusWatermarkValve,
-                inputIndex);
+                inputIndex,
+                canEmitBatchOfRecords);
     }
 
     protected DemultiplexingRecordDeserializer<T> getActiveSerializer(
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
index b86c0b9d847..7883306e926 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
@@ -177,7 +177,8 @@ public class MultipleInputStreamTask<OUT>
                         operatorChain,
                         getEnvironment().getTaskStateManager().getInputRescalingDescriptor(),
                         gatePartitioners,
-                        getEnvironment().getTaskInfo());
+                        getEnvironment().getTaskInfo(),
+                        getCanEmitBatchOfRecords());
     }
 
     protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index ddcb0182633..025e04cfa08 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -206,7 +206,8 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
                                 .getInPhysicalEdges(getUserCodeClassLoader())
                                 .get(gateIndex)
                                 .getPartitioner(),
-                getEnvironment().getTaskInfo());
+                getEnvironment().getTaskInfo(),
+                getCanEmitBatchOfRecords());
     }
 
     /**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 4658c454b12..544fd63d944 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -102,7 +102,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTas
                         setupNumRecordsInCounter(mainOperator),
                         getEnvironment().getTaskStateManager().getInputRescalingDescriptor(),
                         gatePartitioners,
-                        getEnvironment().getTaskInfo());
+                        getEnvironment().getTaskInfo(),
+                        getCanEmitBatchOfRecords());
     }
 
     // This is needed for StreamMultipleInputProcessor#processInput to preserve the existing
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index ca699a08154..7338469171b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -67,7 +67,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -155,7 +157,8 @@ public class StreamTaskNetworkInputTest {
                         inSerializer,
                         ioManager,
                         new StatusWatermarkValve(numInputChannels),
-                        0);
+                        0,
+                        () -> false);
 
         inputGate.sendEvent(
                 new CheckpointBarrier(
@@ -228,6 +231,92 @@ public class StreamTaskNetworkInputTest {
         assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.END_OF_RECOVERY);
     }
 
+    @Test
+    public void testRecordsAreProcessedInBatches() throws Exception {
+        int numInputChannels = 2;
+        Random random = new Random();
+        LongSerializer inSerializer = LongSerializer.INSTANCE;
+        StreamTestSingleInputGate<Long> inputGate =
+                new StreamTestSingleInputGate<>(numInputChannels, 0, inSerializer, 1024);
+        StreamTaskNetworkInput<Long> input =
+                new StreamTaskNetworkInput<>(
+                        createCheckpointedInputGate(inputGate.getInputGate()),
+                        inSerializer,
+                        ioManager,
+                        new StatusWatermarkValve(numInputChannels),
+                        0,
+                        () -> true);
+        VerifyRecordsDataOutput<Long> output = new VerifyRecordsDataOutput<>();
+
+        // Test for records are processed in batches
+        int expectedElementCount = 3;
+        for (int i = 0; i < expectedElementCount; i++) {
+            inputGate.sendElement(new StreamRecord<>((long) i), random.nextInt(numInputChannels));
+        }
+
+        assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
+        assertThat(output.getNumberOfEmittedRecords()).isEqualTo(expectedElementCount);
+
+        // Test for records are processed in batches during processing event
+        inputGate.sendEvent(EndOfPartitionEvent.INSTANCE, 0);
+        for (int i = 0; i < expectedElementCount; i++) {
+            inputGate.sendElement(new StreamRecord<>((long) i), 1);
+        }
+        assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
+        assertThat(output.getNumberOfEmittedRecords()).isEqualTo(expectedElementCount * 2);
+    }
+
+    @Test
+    public void testBatchProcessingRecordsCanBeInterrupted() throws Exception {
+        int numInputChannels = 2;
+        Random random = new Random();
+        LongSerializer inSerializer = LongSerializer.INSTANCE;
+        StreamTestSingleInputGate<Long> inputGate =
+                new StreamTestSingleInputGate<>(numInputChannels, 0, inSerializer, 1024);
+
+        AtomicBoolean canEmitBatchOfRecords = new AtomicBoolean();
+        StreamTaskNetworkInput<Long> input =
+                new StreamTaskNetworkInput<>(
+                        createCheckpointedInputGate(inputGate.getInputGate()),
+                        inSerializer,
+                        ioManager,
+                        new StatusWatermarkValve(numInputChannels),
+                        0,
+                        canEmitBatchOfRecords::get);
+
+        VerifyRecordsDataOutput<Long> output = new VerifyRecordsDataOutput<>();
+
+        // Test for batch processing records can be interrupted and can be recovered
+        int expectedElementCount = 5;
+        for (int i = 0; i < expectedElementCount; i++) {
+            inputGate.sendElement(new StreamRecord<>((long) i), random.nextInt(numInputChannels));
+        }
+
+        canEmitBatchOfRecords.set(false);
+        assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
+        assertThat(output.getNumberOfEmittedRecords()).isOne();
+
+        canEmitBatchOfRecords.set(true);
+        assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
+        assertThat(output.getNumberOfEmittedRecords()).isEqualTo(expectedElementCount);
+
+        // Test for batch processing records can be interrupted and can be recovered during
+        // processing event
+        inputGate.sendEvent(EndOfPartitionEvent.INSTANCE, 0);
+        for (int i = 0; i < expectedElementCount; i++) {
+            inputGate.sendElement(new StreamRecord<>((long) i), 1);
+        }
+
+        // Just process the event
+        canEmitBatchOfRecords.set(false);
+        assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
+        assertThat(output.getNumberOfEmittedRecords()).isEqualTo(expectedElementCount);
+
+        canEmitBatchOfRecords.set(true);
+        assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
+        assertThat(output.getNumberOfEmittedRecords()).isEqualTo(expectedElementCount * 2);
+    }
+
     private BufferOrEvent createDataBuffer() throws IOException {
         try (BufferBuilder bufferBuilder =
                 BufferBuilderTestUtils.createEmptyBufferBuilder(PAGE_SIZE)) {
@@ -245,7 +334,8 @@ public class StreamTaskNetworkInputTest {
                 LongSerializer.INSTANCE,
                 ioManager,
                 new StatusWatermarkValve(1),
-                0);
+                0,
+                () -> false);
     }
 
     private static CheckpointedInputGate createCheckpointedInputGate(InputGate inputGate) {
@@ -337,7 +427,8 @@ public class StreamTaskNetworkInputTest {
                     inSerializer,
                     new StatusWatermarkValve(numInputChannels),
                     0,
-                    deserializers);
+                    deserializers,
+                    () -> false);
         }
 
         @Override