You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/05/26 06:55:48 UTC
[flink] 01/01: Revert IDLE/ACTIVE on records
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch benchmark-request
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d83d0e92701866ca181c7c8d88a2d438a61e3be0
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed May 26 08:51:55 2021 +0200
Revert IDLE/ACTIVE on records
---
.../java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java | 2 +-
.../flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java | 2 --
2 files changed, 1 insertion(+), 3 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 8d1d38c..05351b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -100,7 +100,7 @@ public class RecordWriterOutput<OUT> implements WatermarkGaugeExposingOutput<Str
private <X> void pushToRecordWriter(StreamRecord<X> record) {
// record could've been generated somewhere in the pipeline even though an IDLE status was
// emitted. It might've originated from a timer or just a wrong behaving operator
- try (AutoCloseable ignored = announcedStatus.ensureActive(this::writeStreamStatus)) {
+ try /*(AutoCloseable ignored = announcedStatus.ensureActive(this::writeStreamStatus))*/ {
serializationDelegate.setInstance(record);
recordWriter.emit(serializationDelegate);
} catch (Exception e) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 352557c..5ed7b4a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -618,10 +618,8 @@ public class MultipleInputStreamTaskTest {
// FLIP-27 sources do not emit active status on new records, we wrap a record with
// ACTIVE/IDLE sequence
- expectedOutput.add(StreamStatus.ACTIVE);
expectedOutput.add(
new StreamRecord<>("" + (initialTime + 10), TimestampAssigner.NO_TIMESTAMP));
- expectedOutput.add(StreamStatus.IDLE);
expectedOutput.add(StreamStatus.ACTIVE); // activate source on new watermark
expectedOutput.add(new Watermark(initialTime + 10)); // forward W from source
expectedOutput.add(StreamStatus.IDLE); // go idle after reading all records