You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/01/27 01:35:19 UTC
[flink] 02/02: [FLINK-30709][runtime] NetworkInput#emitNext() should push records to DataOutput within a while loop
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f6c7c30118ef26f98a7d422831fa8047f1fd9f98
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Thu Jan 26 11:56:02 2023 +0800
[FLINK-30709][runtime] NetworkInput#emitNext() should push records to DataOutput within a while loop
This closes #21697.
---
.../runtime/io/AbstractStreamTaskNetworkInput.java | 16 +++-
.../io/StreamMultipleInputProcessorFactory.java | 7 +-
.../runtime/io/StreamTaskNetworkInput.java | 7 +-
.../runtime/io/StreamTaskNetworkInputFactory.java | 10 ++-
.../runtime/io/StreamTwoInputProcessorFactory.java | 10 ++-
.../recovery/RescalingStreamTaskNetworkInput.java | 10 ++-
.../runtime/tasks/MultipleInputStreamTask.java | 3 +-
.../runtime/tasks/OneInputStreamTask.java | 3 +-
.../runtime/tasks/TwoInputStreamTask.java | 3 +-
.../runtime/io/StreamTaskNetworkInputTest.java | 97 +++++++++++++++++++++-
10 files changed, 145 insertions(+), 21 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
index 6c59c9922bf..db5198bd716 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import java.io.IOException;
@@ -62,12 +63,15 @@ public abstract class AbstractStreamTaskNetworkInput<
private InputChannelInfo lastChannel = null;
private R currentRecordDeserializer = null;
+ protected final CanEmitBatchOfRecordsChecker canEmitBatchOfRecords;
+
public AbstractStreamTaskNetworkInput(
CheckpointedInputGate checkpointedInputGate,
TypeSerializer<T> inputSerializer,
StatusWatermarkValve statusWatermarkValve,
int inputIndex,
- Map<InputChannelInfo, R> recordDeserializers) {
+ Map<InputChannelInfo, R> recordDeserializers,
+ CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
super();
this.checkpointedInputGate = checkpointedInputGate;
deserializationDelegate =
@@ -82,6 +86,7 @@ public abstract class AbstractStreamTaskNetworkInput<
this.statusWatermarkValve = checkNotNull(statusWatermarkValve);
this.inputIndex = inputIndex;
this.recordDeserializers = checkNotNull(recordDeserializers);
+ this.canEmitBatchOfRecords = checkNotNull(canEmitBatchOfRecords);
}
@Override
@@ -103,6 +108,9 @@ public abstract class AbstractStreamTaskNetworkInput<
if (result.isFullRecord()) {
processElement(deserializationDelegate.getInstance(), output);
+ if (canEmitBatchOfRecords.check()) {
+ continue;
+ }
return DataInputStatus.MORE_AVAILABLE;
}
}
@@ -115,7 +123,11 @@ public abstract class AbstractStreamTaskNetworkInput<
if (bufferOrEvent.get().isBuffer()) {
processBuffer(bufferOrEvent.get());
} else {
- return processEvent(bufferOrEvent.get());
+ DataInputStatus status = processEvent(bufferOrEvent.get());
+ if (status == DataInputStatus.MORE_AVAILABLE && canEmitBatchOfRecords.check()) {
+ continue;
+ }
+ return status;
}
} else {
if (checkpointedInputGate.isFinished()) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
index f06e9e71fe8..f318e796abe 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
@@ -47,6 +47,7 @@ import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
@@ -84,7 +85,8 @@ public class StreamMultipleInputProcessorFactory {
OperatorChain<?, ?> operatorChain,
InflightDataRescalingDescriptor inflightDataRescalingDescriptor,
Function<Integer, StreamPartitioner<?>> gatePartitioners,
- TaskInfo taskInfo) {
+ TaskInfo taskInfo,
+ CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
checkNotNull(operatorChain);
List<Input> operatorInputs = mainOperator.getInputs();
@@ -116,7 +118,8 @@ public class StreamMultipleInputProcessorFactory {
i,
inflightDataRescalingDescriptor,
gatePartitioners,
- taskInfo);
+ taskInfo,
+ canEmitBatchOfRecords);
} else if (configuredInput instanceof StreamConfig.SourceInputConfig) {
StreamConfig.SourceInputConfig sourceInput =
(StreamConfig.SourceInputConfig) configuredInput;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
index 793252f6b67..c4491b11559 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
@@ -65,13 +66,15 @@ public final class StreamTaskNetworkInput<T>
TypeSerializer<T> inputSerializer,
IOManager ioManager,
StatusWatermarkValve statusWatermarkValve,
- int inputIndex) {
+ int inputIndex,
+ CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
super(
checkpointedInputGate,
inputSerializer,
statusWatermarkValve,
inputIndex,
- getRecordDeserializers(checkpointedInputGate, ioManager));
+ getRecordDeserializers(checkpointedInputGate, ioManager),
+ canEmitBatchOfRecords);
}
// Initialize one deserializer per input channel
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputFactory.java
index a9b46717d22..5895717d6ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import java.util.function.Function;
@@ -42,7 +43,8 @@ public class StreamTaskNetworkInputFactory {
int inputIndex,
InflightDataRescalingDescriptor rescalingDescriptorinflightDataRescalingDescriptor,
Function<Integer, StreamPartitioner<?>> gatePartitioners,
- TaskInfo taskInfo) {
+ TaskInfo taskInfo,
+ CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
return rescalingDescriptorinflightDataRescalingDescriptor.equals(
InflightDataRescalingDescriptor.NO_RESCALE)
? new StreamTaskNetworkInput<>(
@@ -50,7 +52,8 @@ public class StreamTaskNetworkInputFactory {
inputSerializer,
ioManager,
statusWatermarkValve,
- inputIndex)
+ inputIndex,
+ canEmitBatchOfRecords)
: new RescalingStreamTaskNetworkInput<>(
checkpointedInputGate,
inputSerializer,
@@ -59,6 +62,7 @@ public class StreamTaskNetworkInputFactory {
inputIndex,
rescalingDescriptorinflightDataRescalingDescriptor,
gatePartitioners,
- taskInfo);
+ taskInfo,
+ canEmitBatchOfRecords);
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
index ea296a3283c..3b2dc87a911 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
@@ -42,6 +42,7 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.function.ThrowingConsumer;
@@ -75,7 +76,8 @@ public class StreamTwoInputProcessorFactory {
Counter numRecordsIn,
InflightDataRescalingDescriptor inflightDataRescalingDescriptor,
Function<Integer, StreamPartitioner<?>> gatePartitioners,
- TaskInfo taskInfo) {
+ TaskInfo taskInfo,
+ CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
checkNotNull(operatorChain);
@@ -91,7 +93,8 @@ public class StreamTwoInputProcessorFactory {
0,
inflightDataRescalingDescriptor,
gatePartitioners,
- taskInfo);
+ taskInfo,
+ canEmitBatchOfRecords);
TypeSerializer<IN2> typeSerializer2 = streamConfig.getTypeSerializerIn(1, userClassloader);
StreamTaskInput<IN2> input2 =
StreamTaskNetworkInputFactory.create(
@@ -103,7 +106,8 @@ public class StreamTwoInputProcessorFactory {
1,
inflightDataRescalingDescriptor,
gatePartitioners,
- taskInfo);
+ taskInfo,
+ canEmitBatchOfRecords);
InputSelectable inputSelectable =
streamOperator instanceof InputSelectable ? (InputSelectable) streamOperator : null;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java
index 8c359a016ae..b95dded940e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java
@@ -41,6 +41,7 @@ import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitio
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
@@ -94,7 +95,8 @@ public final class RescalingStreamTaskNetworkInput<T>
int inputIndex,
InflightDataRescalingDescriptor inflightDataRescalingDescriptor,
Function<Integer, StreamPartitioner<?>> gatePartitioners,
- TaskInfo taskInfo) {
+ TaskInfo taskInfo,
+ CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
super(
checkpointedInputGate,
inputSerializer,
@@ -106,7 +108,8 @@ public final class RescalingStreamTaskNetworkInput<T>
ioManager,
inflightDataRescalingDescriptor,
gatePartitioners,
- taskInfo));
+ taskInfo),
+ canEmitBatchOfRecords);
this.ioManager = ioManager;
LOG.info(
@@ -159,7 +162,8 @@ public final class RescalingStreamTaskNetworkInput<T>
inputSerializer,
ioManager,
statusWatermarkValve,
- inputIndex);
+ inputIndex,
+ canEmitBatchOfRecords);
}
protected DemultiplexingRecordDeserializer<T> getActiveSerializer(
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
index b86c0b9d847..7883306e926 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
@@ -177,7 +177,8 @@ public class MultipleInputStreamTask<OUT>
operatorChain,
getEnvironment().getTaskStateManager().getInputRescalingDescriptor(),
gatePartitioners,
- getEnvironment().getTaskInfo());
+ getEnvironment().getTaskInfo(),
+ getCanEmitBatchOfRecords());
}
protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index ddcb0182633..025e04cfa08 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -206,7 +206,8 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
.getInPhysicalEdges(getUserCodeClassLoader())
.get(gateIndex)
.getPartitioner(),
- getEnvironment().getTaskInfo());
+ getEnvironment().getTaskInfo(),
+ getCanEmitBatchOfRecords());
}
/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 4658c454b12..544fd63d944 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -102,7 +102,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTas
setupNumRecordsInCounter(mainOperator),
getEnvironment().getTaskStateManager().getInputRescalingDescriptor(),
gatePartitioners,
- getEnvironment().getTaskInfo());
+ getEnvironment().getTaskInfo(),
+ getCanEmitBatchOfRecords());
}
// This is needed for StreamMultipleInputProcessor#processInput to preserve the existing
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index ca699a08154..7338469171b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -67,7 +67,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -155,7 +157,8 @@ public class StreamTaskNetworkInputTest {
inSerializer,
ioManager,
new StatusWatermarkValve(numInputChannels),
- 0);
+ 0,
+ () -> false);
inputGate.sendEvent(
new CheckpointBarrier(
@@ -228,6 +231,92 @@ public class StreamTaskNetworkInputTest {
assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.END_OF_RECOVERY);
}
+ @Test
+ public void testRecordsAreProcessedInBatches() throws Exception {
+ int numInputChannels = 2;
+ Random random = new Random();
+ LongSerializer inSerializer = LongSerializer.INSTANCE;
+ StreamTestSingleInputGate<Long> inputGate =
+ new StreamTestSingleInputGate<>(numInputChannels, 0, inSerializer, 1024);
+ StreamTaskNetworkInput<Long> input =
+ new StreamTaskNetworkInput<>(
+ createCheckpointedInputGate(inputGate.getInputGate()),
+ inSerializer,
+ ioManager,
+ new StatusWatermarkValve(numInputChannels),
+ 0,
+ () -> true);
+ VerifyRecordsDataOutput<Long> output = new VerifyRecordsDataOutput<>();
+
+ // Test for records are processed in batches
+ int expectedElementCount = 3;
+ for (int i = 0; i < expectedElementCount; i++) {
+ inputGate.sendElement(new StreamRecord<>((long) i), random.nextInt(numInputChannels));
+ }
+
+ assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
+ assertThat(output.getNumberOfEmittedRecords()).isEqualTo(expectedElementCount);
+
+ // Test for records are processed in batches during processing event
+ inputGate.sendEvent(EndOfPartitionEvent.INSTANCE, 0);
+ for (int i = 0; i < expectedElementCount; i++) {
+ inputGate.sendElement(new StreamRecord<>((long) i), 1);
+ }
+ assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
+ assertThat(output.getNumberOfEmittedRecords()).isEqualTo(expectedElementCount * 2);
+ }
+
+ @Test
+ public void testBatchProcessingRecordsCanBeInterrupted() throws Exception {
+ int numInputChannels = 2;
+ Random random = new Random();
+ LongSerializer inSerializer = LongSerializer.INSTANCE;
+ StreamTestSingleInputGate<Long> inputGate =
+ new StreamTestSingleInputGate<>(numInputChannels, 0, inSerializer, 1024);
+
+ AtomicBoolean canEmitBatchOfRecords = new AtomicBoolean();
+ StreamTaskNetworkInput<Long> input =
+ new StreamTaskNetworkInput<>(
+ createCheckpointedInputGate(inputGate.getInputGate()),
+ inSerializer,
+ ioManager,
+ new StatusWatermarkValve(numInputChannels),
+ 0,
+ canEmitBatchOfRecords::get);
+
+ VerifyRecordsDataOutput<Long> output = new VerifyRecordsDataOutput<>();
+
+ // Test for batch processing records can be interrupted and can be recovered
+ int expectedElementCount = 5;
+ for (int i = 0; i < expectedElementCount; i++) {
+ inputGate.sendElement(new StreamRecord<>((long) i), random.nextInt(numInputChannels));
+ }
+
+ canEmitBatchOfRecords.set(false);
+ assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
+ assertThat(output.getNumberOfEmittedRecords()).isOne();
+
+ canEmitBatchOfRecords.set(true);
+ assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
+ assertThat(output.getNumberOfEmittedRecords()).isEqualTo(expectedElementCount);
+
+ // Test for batch processing records can be interrupted and can be recovered during
+ // processing event
+ inputGate.sendEvent(EndOfPartitionEvent.INSTANCE, 0);
+ for (int i = 0; i < expectedElementCount; i++) {
+ inputGate.sendElement(new StreamRecord<>((long) i), 1);
+ }
+
+ // Just process the event
+ canEmitBatchOfRecords.set(false);
+ assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
+ assertThat(output.getNumberOfEmittedRecords()).isEqualTo(expectedElementCount);
+
+ canEmitBatchOfRecords.set(true);
+ assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
+ assertThat(output.getNumberOfEmittedRecords()).isEqualTo(expectedElementCount * 2);
+ }
+
private BufferOrEvent createDataBuffer() throws IOException {
try (BufferBuilder bufferBuilder =
BufferBuilderTestUtils.createEmptyBufferBuilder(PAGE_SIZE)) {
@@ -245,7 +334,8 @@ public class StreamTaskNetworkInputTest {
LongSerializer.INSTANCE,
ioManager,
new StatusWatermarkValve(1),
- 0);
+ 0,
+ () -> false);
}
private static CheckpointedInputGate createCheckpointedInputGate(InputGate inputGate) {
@@ -337,7 +427,8 @@ public class StreamTaskNetworkInputTest {
inSerializer,
new StatusWatermarkValve(numInputChannels),
0,
- deserializers);
+ deserializers,
+ () -> false);
}
@Override