You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/05/26 06:55:48 UTC

[flink] 01/01: Revert IDLE/ACTIVE on records

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch benchmark-request
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d83d0e92701866ca181c7c8d88a2d438a61e3be0
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed May 26 08:51:55 2021 +0200

    Revert IDLE/ACTIVE on records
---
 .../java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java  | 2 +-
 .../flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java      | 2 --
 2 files changed, 1 insertion(+), 3 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 8d1d38c..05351b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -100,7 +100,7 @@ public class RecordWriterOutput<OUT> implements WatermarkGaugeExposingOutput<Str
     private <X> void pushToRecordWriter(StreamRecord<X> record) {
         // record could've been generated somewhere in the pipeline even though an IDLE status was
         // emitted. It might've originated from a timer or just a wrong behaving operator
-        try (AutoCloseable ignored = announcedStatus.ensureActive(this::writeStreamStatus)) {
+        try /*(AutoCloseable ignored = announcedStatus.ensureActive(this::writeStreamStatus))*/ {
             serializationDelegate.setInstance(record);
             recordWriter.emit(serializationDelegate);
         } catch (Exception e) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 352557c..5ed7b4a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -618,10 +618,8 @@ public class MultipleInputStreamTaskTest {
 
             // FLIP-27 sources do not emit active status on new records, we wrap a record with
             // ACTIVE/IDLE sequence
-            expectedOutput.add(StreamStatus.ACTIVE);
             expectedOutput.add(
                     new StreamRecord<>("" + (initialTime + 10), TimestampAssigner.NO_TIMESTAMP));
-            expectedOutput.add(StreamStatus.IDLE);
             expectedOutput.add(StreamStatus.ACTIVE); // activate source on new watermark
             expectedOutput.add(new Watermark(initialTime + 10)); // forward W from source
             expectedOutput.add(StreamStatus.IDLE); // go idle after reading all records