You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2023/12/19 11:00:46 UTC

(flink) branch master updated (4cc24c1dd17 -> f6bbf1cf364)

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 4cc24c1dd17 [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission
     new 72a0ed61085 [hotfix][runtime] CollectionDataInput handles WatermarkStatus
     new 2bf39a26bc5 [FLINK-33810][runtime] SourceCoordinator notify SourceOperator when backlog is changed
     new 42df7ebc8c1 [FLINK-33810][runtime] Introduce RecordAttributes
     new f852c836f6b [FLINK-33810][runtime] Introduce processRecordAttributes method to Input and TwoInputStreamOperator
     new 28c2d2925ca [FLINK-33810][runtime] Introduce emitRecordAttributes method to the Output
     new 89a8a1693ba [FLINK-33202][runtime] SourceOperator send Watermark and RecordAttributes downstream when backlog status changed
     new f92e2c8be84 [FLINK-33810][runtime] Introduce and implement emitRecordAttributes method of PushingAsyncDataInput#DataOutput
     new 43fed747e9f [FLINK-33810][runtime] AbstractStreamTaskNetworkInput process RecordAttributes
     new f6bbf1cf364 [FLINK-33810][runtime] AbstractStreamOperator implements processRecordAttributes

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../base/source/reader/SourceReaderBaseTest.java   |   4 +
 .../flink/state/api/output/BoundedStreamTask.java  |   4 +
 .../operators/StateBootstrapWrapperOperator.java   |   4 +
 .../source/coordinator/SourceCoordinator.java      |   7 ++
 .../coordinator/SourceCoordinatorContext.java      |  22 +++++
 ...entEvent.java => IsProcessingBacklogEvent.java} |  24 +++--
 .../coordination/EventReceivingTasks.java          |  11 ++-
 .../MockOperatorCoordinatorContext.java            |   2 +-
 .../coordinator/SourceCoordinatorContextTest.java  |  23 +++++
 .../streaming/api/operators/AbstractInput.java     |   6 ++
 .../api/operators/AbstractStreamOperator.java      |  34 +++++++
 .../api/operators/AbstractStreamOperatorV2.java    |  15 +++
 .../streaming/api/operators/CountingOutput.java    |   6 ++
 .../flink/streaming/api/operators/Input.java       |   9 ++
 .../flink/streaming/api/operators/Output.java      |   9 ++
 .../streaming/api/operators/SourceOperator.java    |  10 ++
 .../api/operators/TimestampedCollector.java        |   6 ++
 .../api/operators/TwoInputStreamOperator.java      |  16 +++
 .../operators/sort/MultiInputSortingDataInput.java |   8 ++
 .../api/operators/sort/SortingDataInput.java       |   7 ++
 .../source/NoOpTimestampsAndWatermarks.java        |   5 +
 .../source/ProgressiveTimestampsAndWatermarks.java |   5 +-
 .../operators/source/TimestampsAndWatermarks.java  |   3 +
 .../runtime/io/AbstractStreamTaskNetworkInput.java |  43 +++++---
 .../streaming/runtime/io/FinishedDataOutput.java   |   6 ++
 .../runtime/io/PushingAsyncDataInput.java          |   3 +
 .../runtime/io/RecordAttributesCombiner.java       | 110 +++++++++++++++++++++
 .../streaming/runtime/io/RecordWriterOutput.java   |  11 +++
 .../io/StreamMultipleInputProcessorFactory.java    |   6 ++
 .../runtime/io/StreamTwoInputProcessorFactory.java |  10 ++
 .../runtime/streamrecord/RecordAttributes.java     |  71 +++++++++++++
 .../streamrecord/RecordAttributesBuilder.java      |  78 +++++++++++++++
 .../runtime/streamrecord/StreamElement.java        |  22 ++++-
 .../streamrecord/StreamElementSerializer.java      |  10 ++
 .../runtime/tasks/BroadcastingOutputCollector.java |   8 ++
 .../streaming/runtime/tasks/ChainingOutput.java    |  10 ++
 .../tasks/FinishedOnRestoreMainOperatorOutput.java |   8 ++
 .../runtime/tasks/OneInputStreamTask.java          |   6 ++
 .../runtime/tasks/SourceOperatorStreamTask.java    |   6 ++
 .../runtime/tasks/StreamIterationTail.java         |   4 +
 .../api/operators/AbstractStreamOperatorTest.java  |  59 +++++++++++
 .../operators/AbstractStreamOperatorV2Test.java    |  35 +++++++
 .../api/operators/SourceOperatorTest.java          |  82 +++++++++++++++
 .../api/operators/SourceOperatorTestContext.java   |  10 +-
 .../api/operators/sort/CollectingDataOutput.java   |   6 ++
 .../api/operators/sort/CollectionDataInput.java    |   4 +
 .../sort/LargeSortingDataInputITCase.java          |   4 +
 .../api/operators/source/CollectingDataOutput.java |   6 ++
 .../runtime/io/RecordAttributesCombinerTest.java   |  84 ++++++++++++++++
 .../runtime/io/StreamTaskNetworkInputTest.java     |  54 ++++++++++
 .../streamrecord/StreamElementSerializerTest.java  |   5 +
 .../watermarkstatus/StatusWatermarkValveTest.java  |   6 ++
 .../util/AbstractStreamOperatorTestHarness.java    |   6 ++
 .../flink/streaming/util/CollectorOutput.java      |   6 ++
 .../apache/flink/streaming/util/MockOutput.java    |   6 ++
 .../util/MultiInputStreamOperatorTestHarness.java  |   6 ++
 .../util/OneInputStreamOperatorTestHarness.java    |  11 +++
 .../streaming/util/SourceOperatorTestHarness.java  |   6 ++
 .../util/TwoInputStreamOperatorTestHarness.java    |   9 ++
 .../table/planner/runtime/utils/TimeTestUtil.scala |   4 +-
 .../multipleinput/output/BroadcastingOutput.java   |   8 ++
 ...gSecondInputOfTwoInputStreamOperatorOutput.java |  10 ++
 .../FirstInputOfTwoInputStreamOperatorOutput.java  |  10 ++
 .../output/OneInputStreamOperatorOutput.java       |  10 ++
 .../SecondInputOfTwoInputStreamOperatorOutput.java |  10 ++
 .../multipleinput/output/BlackHoleOutput.java      |   6 ++
 .../over/NonBufferOverWindowOperatorTest.java      |   6 ++
 67 files changed, 1087 insertions(+), 34 deletions(-)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/source/event/{WatermarkAlignmentEvent.java => IsProcessingBacklogEvent.java} (66%)
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordAttributesCombiner.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributesBuilder.java
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordAttributesCombinerTest.java


(flink) 02/09: [FLINK-33810][runtime] SourceCoordinator notify SourceOperator when backlog is changed

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2bf39a26bc575f8b50c338486f0f52a9aa7ddee3
Author: sxnan <su...@gmail.com>
AuthorDate: Thu Nov 16 15:14:17 2023 +0800

    [FLINK-33810][runtime] SourceCoordinator notify SourceOperator when backlog is changed
---
 .../source/coordinator/SourceCoordinator.java      |  7 +++
 .../coordinator/SourceCoordinatorContext.java      | 22 ++++++++
 .../source/event/IsProcessingBacklogEvent.java     | 58 ++++++++++++++++++++++
 .../coordination/EventReceivingTasks.java          | 11 ++--
 .../MockOperatorCoordinatorContext.java            |  2 +-
 .../coordinator/SourceCoordinatorContextTest.java  | 23 +++++++++
 6 files changed, 118 insertions(+), 5 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index faeac9a8dc4..b69f8172ab1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
 import org.apache.flink.runtime.source.event.RequestSplitEvent;
@@ -607,6 +608,12 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
         context.registerSourceReader(subtask, attemptNumber, event.location());
         if (!subtaskReaderExisted) {
             enumerator.addReader(event.subtaskId());
+
+            final Boolean isBacklog = context.isBacklog().getAsBoolean();
+            if (isBacklog != null) {
+                context.sendEventToSourceOperatorIfTaskReady(
+                        subtask, new IsProcessingBacklogEvent(isBacklog));
+            }
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 492cdf19e28..b42223d040d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -35,10 +35,12 @@ import org.apache.flink.runtime.metrics.groups.InternalSplitEnumeratorMetricGrou
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent;
 import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.TernaryBoolean;
 import org.apache.flink.util.ThrowableCatchingRunnable;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
@@ -112,6 +114,7 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
     private final boolean supportsConcurrentExecutionAttempts;
     private final boolean[] subtaskHasNoMoreSplits;
     private volatile boolean closed;
+    private volatile TernaryBoolean backlog = TernaryBoolean.UNDEFINED;
 
     public SourceCoordinatorContext(
             SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
@@ -370,6 +373,17 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
         if (checkpointCoordinator != null) {
             checkpointCoordinator.setIsProcessingBacklog(operatorID, isProcessingBacklog);
         }
+        backlog = TernaryBoolean.fromBoolean(isProcessingBacklog);
+        callInCoordinatorThread(
+                () -> {
+                    final IsProcessingBacklogEvent isProcessingBacklogEvent =
+                            new IsProcessingBacklogEvent(isProcessingBacklog);
+                    for (int i = 0; i < getCoordinatorContext().currentParallelism(); i++) {
+                        sendEventToSourceOperatorIfTaskReady(i, isProcessingBacklogEvent);
+                    }
+                    return null;
+                },
+                "Failed to send BacklogEvent to reader.");
     }
 
     // --------- Package private additional methods for the SourceCoordinator ------------
@@ -629,6 +643,14 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
         }
     }
 
+    /**
+     * Returns whether the Source is processing backlog data. UNDEFINED is returned if it is not set
+     * by the {@link #setIsProcessingBacklog} method.
+     */
+    public TernaryBoolean isBacklog() {
+        return backlog;
+    }
+
     /** Maintains the subtask gateways for different execution attempts of different subtasks. */
     private static class SubtaskGateways {
         private final Map<Integer, OperatorCoordinator.SubtaskGateway>[] gateways;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/IsProcessingBacklogEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/IsProcessingBacklogEvent.java
new file mode 100644
index 00000000000..cf74258e1bd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/IsProcessingBacklogEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.event;
+
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+import java.util.Objects;
+
+/** A source event that notify the source operator of the backlog status. */
+public class IsProcessingBacklogEvent implements OperatorEvent {
+    private static final long serialVersionUID = 1L;
+    private final boolean isProcessingBacklog;
+
+    public IsProcessingBacklogEvent(boolean isProcessingBacklog) {
+        this.isProcessingBacklog = isProcessingBacklog;
+    }
+
+    public boolean isProcessingBacklog() {
+        return isProcessingBacklog;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final IsProcessingBacklogEvent that = (IsProcessingBacklogEvent) o;
+        return Objects.equals(isProcessingBacklog, that.isProcessingBacklog);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(isProcessingBacklog);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("BacklogEvent (backlog='%s')", isProcessingBacklog);
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
index 3d3df195856..15659cb5c6e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
@@ -96,10 +96,13 @@ public class EventReceivingTasks implements SubtaskAccess.SubtaskAccessFactory {
     }
 
     public List<OperatorEvent> getSentEventsForSubtask(int subtaskIndex) {
-        return events.stream()
-                .filter((evt) -> evt.subtask == subtaskIndex)
-                .map((evt) -> evt.event)
-                .collect(Collectors.toList());
+
+        // Create a new array list to avoid concurrent modification during processing the events
+        return new ArrayList<>(events)
+                .stream()
+                        .filter((evt) -> evt.subtask == subtaskIndex)
+                        .map((evt) -> evt.event)
+                        .collect(Collectors.toList());
     }
 
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
index 1355d1186a2..f7f368eb863 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
@@ -95,7 +95,7 @@ public class MockOperatorCoordinatorContext implements OperatorCoordinator.Conte
 
     @Override
     public CheckpointCoordinator getCheckpointCoordinator() {
-        throw new UnsupportedOperationException();
+        return null;
     }
 
     // -------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index 473330d5bc8..0d4c1a02d4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 
 import org.junit.jupiter.api.Test;
@@ -261,4 +262,26 @@ class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
 
         return infos;
     }
+
+    @Test
+    void testSetIsProcessingBacklog() throws Exception {
+        sourceReady();
+        registerReader(0, 0);
+        context.setIsProcessingBacklog(true);
+
+        for (int i = 0; i < context.currentParallelism(); ++i) {
+            final List<OperatorEvent> events = receivingTasks.getSentEventsForSubtask(i);
+            assertThat(events.get(events.size() - 1)).isEqualTo(new IsProcessingBacklogEvent(true));
+        }
+
+        registerReader(1, 0);
+        context.setIsProcessingBacklog(false);
+        registerReader(2, 0);
+
+        for (int i = 0; i < context.currentParallelism(); ++i) {
+            final List<OperatorEvent> events = receivingTasks.getSentEventsForSubtask(i);
+            assertThat(events.get(events.size() - 1))
+                    .isEqualTo(new IsProcessingBacklogEvent(false));
+        }
+    }
 }


(flink) 08/09: [FLINK-33810][runtime] AbstractStreamTaskNetworkInput process RecordAttributes

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 43fed747e9f6d8e4c4b898134e866721810607f7
Author: sxnan <su...@gmail.com>
AuthorDate: Tue Nov 21 16:20:03 2023 +0800

    [FLINK-33810][runtime] AbstractStreamTaskNetworkInput process RecordAttributes
---
 .../runtime/io/AbstractStreamTaskNetworkInput.java |  43 +++++---
 .../runtime/io/RecordAttributesCombiner.java       | 110 +++++++++++++++++++++
 .../runtime/io/RecordAttributesCombinerTest.java   |  84 ++++++++++++++++
 .../runtime/io/StreamTaskNetworkInputTest.java     |  50 ++++++++++
 4 files changed, 276 insertions(+), 11 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
index db5198bd716..90eb16aaf43 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
@@ -60,6 +60,7 @@ public abstract class AbstractStreamTaskNetworkInput<
     protected final StatusWatermarkValve statusWatermarkValve;
 
     protected final int inputIndex;
+    private final RecordAttributesCombiner recordAttributesCombiner;
     private InputChannelInfo lastChannel = null;
     private R currentRecordDeserializer = null;
 
@@ -87,6 +88,8 @@ public abstract class AbstractStreamTaskNetworkInput<
         this.inputIndex = inputIndex;
         this.recordDeserializers = checkNotNull(recordDeserializers);
         this.canEmitBatchOfRecords = checkNotNull(canEmitBatchOfRecords);
+        this.recordAttributesCombiner =
+                new RecordAttributesCombiner(checkpointedInputGate.getNumberOfInputChannels());
     }
 
     @Override
@@ -107,8 +110,9 @@ public abstract class AbstractStreamTaskNetworkInput<
                 }
 
                 if (result.isFullRecord()) {
-                    processElement(deserializationDelegate.getInstance(), output);
-                    if (canEmitBatchOfRecords.check()) {
+                    final boolean breakBatchEmitting =
+                            processElement(deserializationDelegate.getInstance(), output);
+                    if (canEmitBatchOfRecords.check() && !breakBatchEmitting) {
                         continue;
                     }
                     return DataInputStatus.MORE_AVAILABLE;
@@ -141,19 +145,36 @@ public abstract class AbstractStreamTaskNetworkInput<
         }
     }
 
-    private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
-        if (recordOrMark.isRecord()) {
-            output.emitRecord(recordOrMark.asRecord());
-        } else if (recordOrMark.isWatermark()) {
+    /**
+     * Process the given stream element and returns whether to stop processing and return from the
+     * emitNext method so that the emitNext is invoked again right after processing the element to
+     * allow behavior change in emitNext method. For example, the behavior of emitNext may need to
+     * change right after process a RecordAttributes.
+     */
+    private boolean processElement(StreamElement streamElement, DataOutput<T> output)
+            throws Exception {
+        if (streamElement.isRecord()) {
+            output.emitRecord(streamElement.asRecord());
+            return false;
+        } else if (streamElement.isWatermark()) {
             statusWatermarkValve.inputWatermark(
-                    recordOrMark.asWatermark(), flattenedChannelIndices.get(lastChannel), output);
-        } else if (recordOrMark.isLatencyMarker()) {
-            output.emitLatencyMarker(recordOrMark.asLatencyMarker());
-        } else if (recordOrMark.isWatermarkStatus()) {
+                    streamElement.asWatermark(), flattenedChannelIndices.get(lastChannel), output);
+            return false;
+        } else if (streamElement.isLatencyMarker()) {
+            output.emitLatencyMarker(streamElement.asLatencyMarker());
+            return false;
+        } else if (streamElement.isWatermarkStatus()) {
             statusWatermarkValve.inputWatermarkStatus(
-                    recordOrMark.asWatermarkStatus(),
+                    streamElement.asWatermarkStatus(),
                     flattenedChannelIndices.get(lastChannel),
                     output);
+            return false;
+        } else if (streamElement.isRecordAttributes()) {
+            recordAttributesCombiner.inputRecordAttributes(
+                    streamElement.asRecordAttributes(),
+                    flattenedChannelIndices.get(lastChannel),
+                    output);
+            return true;
         } else {
             throw new UnsupportedOperationException("Unknown type of StreamElement");
         }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordAttributesCombiner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordAttributesCombiner.java
new file mode 100644
index 00000000000..c5303575200
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordAttributesCombiner.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/** RecordAttributesValve combine RecordAttributes from different input channels. */
+public class RecordAttributesCombiner {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RecordAttributesCombiner.class);
+
+    private final RecordAttributes[] allChannelRecordAttributes;
+    private int backlogChannelCnt = 0;
+    private int backlogUndefinedChannelCnt;
+    private RecordAttributes lastOutputAttributes = null;
+
+    public RecordAttributesCombiner(int numInputChannels) {
+        this.backlogUndefinedChannelCnt = numInputChannels;
+        this.allChannelRecordAttributes = new RecordAttributes[numInputChannels];
+    }
+
+    public void inputRecordAttributes(
+            RecordAttributes recordAttributes, int channelIdx, DataOutput<?> output)
+            throws Exception {
+        LOG.debug("RecordAttributes: {} from channel idx: {}", recordAttributes, channelIdx);
+        RecordAttributes lastChannelRecordAttributes = allChannelRecordAttributes[channelIdx];
+        allChannelRecordAttributes[channelIdx] = recordAttributes;
+
+        // skip if the input RecordAttributes of the input channel is the same as the last.
+        if (recordAttributes.equals(lastChannelRecordAttributes)) {
+            return;
+        }
+
+        final RecordAttributesBuilder builder =
+                new RecordAttributesBuilder(Collections.emptyList());
+
+        Boolean isBacklog = combineIsBacklog(lastChannelRecordAttributes, recordAttributes);
+        if (isBacklog == null) {
+            if (lastOutputAttributes == null) {
+                return;
+            } else {
+                isBacklog = lastOutputAttributes.isBacklog();
+            }
+        }
+        builder.setBacklog(isBacklog);
+
+        final RecordAttributes outputAttribute = builder.build();
+        if (!outputAttribute.equals(lastOutputAttributes)) {
+            output.emitRecordAttributes(outputAttribute);
+            lastOutputAttributes = outputAttribute;
+        }
+    }
+
+    /**
+     * If any of the input channels is backlog, the combined RecordAttributes is backlog. Return
+     * null if the isBacklog cannot be determined, i.e. when none of the input channel is processing
+     * backlog and some input channels are undefined.
+     */
+    private Boolean combineIsBacklog(
+            RecordAttributes lastRecordAttributes, RecordAttributes recordAttributes) {
+
+        if (lastRecordAttributes == null) {
+            backlogUndefinedChannelCnt--;
+            if (recordAttributes.isBacklog()) {
+                backlogChannelCnt++;
+            }
+        } else if (lastRecordAttributes.isBacklog() != recordAttributes.isBacklog()) {
+            if (recordAttributes.isBacklog()) {
+                backlogChannelCnt++;
+            } else {
+                backlogChannelCnt--;
+            }
+        }
+
+        // The input is processing backlog if any channel is processing backlog
+        if (backlogChannelCnt > 0) {
+            return true;
+        }
+
+        // None of the input channel is processing backlog and some are undefined
+        if (backlogUndefinedChannelCnt > 0) {
+            return null;
+        }
+
+        // All the input channels are defined and not processing backlog
+        return false;
+    }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordAttributesCombinerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordAttributesCombinerTest.java
new file mode 100644
index 00000000000..da97a2eb1c5
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordAttributesCombinerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RecordAttributesCombiner}. */
+public class RecordAttributesCombinerTest {
+
+    @Test
+    public void testCombineRecordAttributes() throws Exception {
+        final RecordAttributesCombiner combiner = new RecordAttributesCombiner(3);
+        CollectingDataOutput<Object> collectingDataOutput = new CollectingDataOutput<>();
+        final RecordAttributes backlogRecordAttribute =
+                new RecordAttributesBuilder(Collections.emptyList()).setBacklog(true).build();
+        final RecordAttributes nonBacklogRecordAttribute =
+                new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build();
+
+        // Switch from null to backlog
+        combiner.inputRecordAttributes(backlogRecordAttribute, 0, collectingDataOutput);
+        combiner.inputRecordAttributes(nonBacklogRecordAttribute, 0, collectingDataOutput);
+        combiner.inputRecordAttributes(nonBacklogRecordAttribute, 0, collectingDataOutput);
+
+        combiner.inputRecordAttributes(backlogRecordAttribute, 1, collectingDataOutput);
+        combiner.inputRecordAttributes(backlogRecordAttribute, 2, collectingDataOutput);
+
+        // Switch from backlog to non-backlog
+        combiner.inputRecordAttributes(nonBacklogRecordAttribute, 1, collectingDataOutput);
+        combiner.inputRecordAttributes(nonBacklogRecordAttribute, 2, collectingDataOutput);
+
+        // Switch from non-backlog to backlog
+        combiner.inputRecordAttributes(backlogRecordAttribute, 1, collectingDataOutput);
+
+        assertThat(collectingDataOutput.getEvents())
+                .containsExactly(
+                        backlogRecordAttribute, nonBacklogRecordAttribute, backlogRecordAttribute);
+    }
+
+    @Test
+    public void testCombinerOnlyOutputNonBacklogWhenAllInputChannelAreNonBacklog()
+            throws Exception {
+        final RecordAttributesCombiner combiner = new RecordAttributesCombiner(2);
+        CollectingDataOutput<Object> collectingDataOutput = new CollectingDataOutput<>();
+
+        final RecordAttributes backlogRecordAttribute =
+                new RecordAttributesBuilder(Collections.emptyList()).setBacklog(true).build();
+        final RecordAttributes nonBacklogRecordAttribute =
+                new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build();
+
+        combiner.inputRecordAttributes(nonBacklogRecordAttribute, 0, collectingDataOutput);
+        assertThat(collectingDataOutput.getEvents()).isEmpty();
+
+        combiner.inputRecordAttributes(backlogRecordAttribute, 0, collectingDataOutput);
+        combiner.inputRecordAttributes(nonBacklogRecordAttribute, 1, collectingDataOutput);
+        assertThat(collectingDataOutput.getEvents()).containsExactly(backlogRecordAttribute);
+
+        combiner.inputRecordAttributes(nonBacklogRecordAttribute, 0, collectingDataOutput);
+        assertThat(collectingDataOutput.getEvents())
+                .containsExactly(backlogRecordAttribute, nonBacklogRecordAttribute);
+    }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index 481bdaff082..ecdbf27aa3d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.operators.testutils.DummyCheckpointInvokable;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
 import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker;
@@ -50,6 +51,7 @@ import org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarri
 import org.apache.flink.streaming.runtime.io.checkpointing.UpstreamRecoveryTracker;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -318,6 +320,54 @@ public class StreamTaskNetworkInputTest {
         assertThat(output.getNumberOfEmittedRecords()).isEqualTo(expectedElementCount * 2);
     }
 
+    @Test
+    public void testProcessRecordAttributes() throws Exception {
+        int numInputChannels = 2;
+        LongSerializer inSerializer = LongSerializer.INSTANCE;
+        StreamTestSingleInputGate<Long> inputGate =
+                new StreamTestSingleInputGate<>(numInputChannels, 0, inSerializer, 1024);
+        StreamTaskNetworkInput<Long> input =
+                new StreamTaskNetworkInput<>(
+                        createCheckpointedInputGate(inputGate.getInputGate()),
+                        inSerializer,
+                        ioManager,
+                        new StatusWatermarkValve(numInputChannels),
+                        0,
+                        () -> true);
+
+        final CollectingDataOutput<Long> output = new CollectingDataOutput<>();
+
+        final RecordAttributes backlogRecordAttributes =
+                new RecordAttributesBuilder(Collections.emptyList()).setBacklog(true).build();
+        final RecordAttributes nonBacklogRecordAttributes =
+                new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build();
+        final StreamRecord<Long> element1 = new StreamRecord<>(0L, 0);
+        final StreamRecord<Long> element2 = new StreamRecord<>(1L, 1);
+        final StreamRecord<Long> element3 = new StreamRecord<>(2L, 2);
+
+        inputGate.sendElement(element1, 1);
+        inputGate.sendElement(backlogRecordAttributes, 0);
+        input.emitNext(output);
+        assertThat(output.getEvents()).containsExactly(element1, backlogRecordAttributes);
+        output.getEvents().clear();
+
+        inputGate.sendElement(backlogRecordAttributes, 1);
+        inputGate.sendElement(element2, 1);
+        inputGate.sendElement(element3, 1);
+        input.emitNext(output);
+        assertThat(output.getEvents()).isEmpty();
+
+        input.emitNext(output);
+        assertThat(output.getEvents()).containsExactly(element2, element3);
+        output.getEvents().clear();
+
+        inputGate.sendElement(nonBacklogRecordAttributes, 0);
+        inputGate.sendElement(nonBacklogRecordAttributes, 1);
+        input.emitNext(output);
+        input.emitNext(output);
+        assertThat(output.getEvents()).containsExactly(nonBacklogRecordAttributes);
+    }
+
     private BufferOrEvent createDataBuffer() throws IOException {
         try (BufferBuilder bufferBuilder =
                 BufferBuilderTestUtils.createEmptyBufferBuilder(PAGE_SIZE)) {


(flink) 04/09: [FLINK-33810][runtime] Introduce processRecordAttributes method to Input and TwoInputStreamOperator

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f852c836f6b221bd6be7e51ed78734e322e2ef3d
Author: sxnan <su...@gmail.com>
AuthorDate: Mon Nov 20 18:22:17 2023 +0800

    [FLINK-33810][runtime] Introduce processRecordAttributes method to Input and TwoInputStreamOperator
---
 .../org/apache/flink/streaming/api/operators/Input.java  |  9 +++++++++
 .../streaming/api/operators/TwoInputStreamOperator.java  | 16 ++++++++++++++++
 2 files changed, 25 insertions(+)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Input.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Input.java
index d6ca8e6186a..784db9db322 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Input.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Input.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
@@ -69,4 +71,11 @@ public interface Input<IN> {
      * guaranteed to not be called concurrently with other methods of the operator.
      */
     void setKeyContextElement(StreamRecord<IN> record) throws Exception;
+
+    /**
+     * Processes a {@link RecordAttributes} that arrived at this input. This method is guaranteed to
+     * not be called concurrently with other methods of the operator.
+     */
+    @Experimental
+    default void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
index 8cb5ca342dc..f4c2e022d3f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
@@ -98,4 +100,18 @@ public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OU
      * @see org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus
      */
     void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception;
+
+    /**
+     * Processes a {@link RecordAttributes} that arrived on the first input of this operator. This
+     * method is guaranteed to not be called concurrently with other methods of the operator.
+     */
+    @Experimental
+    default void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {}
+
+    /**
+     * Processes a {@link RecordAttributes} that arrived on the second input of this operator. This
+     * method is guaranteed to not be called concurrently with other methods of the operator.
+     */
+    @Experimental
+    default void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {}
 }


(flink) 06/09: [FLINK-33202][runtime] SourceOperator send Watermark and RecordAttributes downstream when backlog status changed

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 89a8a1693baab945e1a502caaf6cec4442db4467
Author: sxnan <su...@gmail.com>
AuthorDate: Thu Nov 16 15:20:36 2023 +0800

    [FLINK-33202][runtime] SourceOperator send Watermark and RecordAttributes downstream when backlog status changed
---
 .../streaming/api/operators/SourceOperator.java    | 10 +++
 .../source/NoOpTimestampsAndWatermarks.java        |  5 ++
 .../source/ProgressiveTimestampsAndWatermarks.java |  5 +-
 .../operators/source/TimestampsAndWatermarks.java  |  3 +
 .../api/operators/SourceOperatorTest.java          | 82 ++++++++++++++++++++++
 .../api/operators/SourceOperatorTestContext.java   | 10 ++-
 6 files changed, 112 insertions(+), 3 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index c4f624e443c..02d3128152d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent;
 import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
@@ -56,6 +57,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
 import org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -569,6 +571,14 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
             sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent());
         } else if (event instanceof NoMoreSplitsEvent) {
             sourceReader.notifyNoMoreSplits();
+        } else if (event instanceof IsProcessingBacklogEvent) {
+            if (eventTimeLogic != null) {
+                eventTimeLogic.emitImmediateWatermark(System.currentTimeMillis());
+            }
+            output.emitRecordAttributes(
+                    new RecordAttributesBuilder(Collections.emptyList())
+                            .setBacklog(((IsProcessingBacklogEvent) event).isProcessingBacklog())
+                            .build());
         } else {
             throw new IllegalStateException("Received unexpected operator event " + event);
         }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java
index a8c75046e7f..3d21512a143 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java
@@ -63,6 +63,11 @@ public class NoOpTimestampsAndWatermarks<T> implements TimestampsAndWatermarks<T
         // no periodic watermarks
     }
 
+    @Override
+    public void emitImmediateWatermark(long wallClockTimestamp) {
+        // do nothing
+    }
+
     // ------------------------------------------------------------------------
 
     /**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
index 14e6b6a7eb9..98507c096b9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
@@ -139,7 +139,7 @@ public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWater
 
         periodicEmitHandle =
                 timeService.scheduleWithFixedDelay(
-                        this::triggerPeriodicEmit,
+                        this::emitImmediateWatermark,
                         periodicWatermarkInterval,
                         periodicWatermarkInterval);
     }
@@ -152,7 +152,8 @@ public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWater
         }
     }
 
-    void triggerPeriodicEmit(@SuppressWarnings("unused") long wallClockTimestamp) {
+    @Override
+    public void emitImmediateWatermark(@SuppressWarnings("unused") long wallClockTimestamp) {
         if (currentPerSplitOutputs != null) {
             currentPerSplitOutputs.emitPeriodicWatermark();
         }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
index cd41ca9ecb4..84bb7457601 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
@@ -80,6 +80,9 @@ public interface TimestampsAndWatermarks<T> {
     /** Stops emitting periodic watermarks. */
     void stopPeriodicWatermarkEmits();
 
+    /** Emit a watermark immediately. */
+    void emitImmediateWatermark(long wallClockTimestamp);
+
     // ------------------------------------------------------------------------
     //  factories
     // ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 5d3054feb69..edb4a6150a7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -18,6 +18,7 @@ limitations under the License.
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.mocks.MockSourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
@@ -26,12 +27,21 @@ import org.apache.flink.runtime.io.network.api.StopMode;
 import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.streaming.util.CollectorOutput;
 import org.apache.flink.util.CollectionUtil;
 
 import org.junit.After;
@@ -40,11 +50,13 @@ import org.junit.Test;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -192,4 +204,74 @@ public class SourceOperatorTest {
         operator.notifyCheckpointAborted(100L);
         assertEquals(100L, (long) mockSourceReader.getAbortedCheckpoints().get(0));
     }
+
+    @Test
+    public void testHandleBacklogEvent() throws Exception {
+        List<StreamElement> outputStreamElements = new ArrayList<>();
+        context =
+                new SourceOperatorTestContext(
+                        false,
+                        WatermarkStrategy.<Integer>forMonotonousTimestamps()
+                                .withTimestampAssigner((element, recordTimestamp) -> element),
+                        new CollectorOutput<>(outputStreamElements));
+        operator = context.getOperator();
+        operator.initializeState(context.createStateContext());
+        operator.open();
+
+        MockSourceSplit newSplit = new MockSourceSplit(2);
+        newSplit.addRecord(1);
+        newSplit.addRecord(1001);
+        operator.handleOperatorEvent(
+                new AddSplitEvent<>(
+                        Collections.singletonList(newSplit), new MockSourceSplitSerializer()));
+        final DataOutputToOutput<Integer> output = new DataOutputToOutput<>(operator.output);
+        operator.emitNext(output);
+        operator.handleOperatorEvent(new IsProcessingBacklogEvent(true));
+
+        operator.emitNext(output);
+        operator.handleOperatorEvent(new IsProcessingBacklogEvent(false));
+
+        assertThat(outputStreamElements)
+                .containsExactly(
+                        new StreamRecord<>(1, 1),
+                        new Watermark(0),
+                        new RecordAttributes(true),
+                        new StreamRecord<>(1001, 1001),
+                        new Watermark(1000),
+                        new RecordAttributes(false));
+    }
+
+    private static class DataOutputToOutput<T> implements PushingAsyncDataInput.DataOutput<T> {
+
+        private final Output<StreamRecord<T>> output;
+
+        DataOutputToOutput(Output<StreamRecord<T>> output) {
+            this.output = output;
+        }
+
+        @Override
+        public void emitRecord(StreamRecord<T> streamRecord) throws Exception {
+            output.collect(streamRecord);
+        }
+
+        @Override
+        public void emitWatermark(Watermark watermark) throws Exception {
+            output.emitWatermark(watermark);
+        }
+
+        @Override
+        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
+            output.emitWatermarkStatus(watermarkStatus);
+        }
+
+        @Override
+        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
+            output.emitLatencyMarker(latencyMarker);
+        }
+
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
+            output.emitRecordAttributes(recordAttributes);
+        }
+    }
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
index 7f83e7304af..96bb3dce5e7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -71,7 +72,14 @@ public class SourceOperatorTestContext implements AutoCloseable {
 
     public SourceOperatorTestContext(boolean idle, WatermarkStrategy<Integer> watermarkStrategy)
             throws Exception {
+        this(idle, watermarkStrategy, new MockOutput<>(new ArrayList<>()));
+    }
 
+    public SourceOperatorTestContext(
+            boolean idle,
+            WatermarkStrategy<Integer> watermarkStrategy,
+            Output<StreamRecord<Integer>> output)
+            throws Exception {
         mockSourceReader = new MockSourceReader(idle, idle);
         mockGateway = new MockOperatorEventGateway();
         timeService = new TestProcessingTimeService();
@@ -88,7 +96,7 @@ public class SourceOperatorTestContext implements AutoCloseable {
         operator.setup(
                 new SourceOperatorStreamTask<Integer>(env),
                 new MockStreamConfig(new Configuration(), 1),
-                new MockOutput<>(new ArrayList<>()));
+                output);
         operator.initializeState(new StreamTaskStateInitializerImpl(env, new MemoryStateBackend()));
     }
 


(flink) 09/09: [FLINK-33810][runtime] AbstractStreamOperator implements processRecordAttributes

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6bbf1cf364a3b4d04e6b6ddc522bad6431b43c4
Author: sxnan <su...@gmail.com>
AuthorDate: Tue Nov 21 14:02:56 2023 +0800

    [FLINK-33810][runtime] AbstractStreamOperator implements processRecordAttributes
    
    This closes #23919
---
 .../streaming/api/operators/AbstractInput.java     |  6 +++
 .../api/operators/AbstractStreamOperator.java      | 34 +++++++++++++
 .../api/operators/AbstractStreamOperatorV2.java    | 15 ++++++
 .../api/operators/AbstractStreamOperatorTest.java  | 59 ++++++++++++++++++++++
 .../operators/AbstractStreamOperatorV2Test.java    | 35 +++++++++++++
 .../util/MultiInputStreamOperatorTestHarness.java  |  6 +++
 .../util/OneInputStreamOperatorTestHarness.java    | 11 ++++
 .../util/TwoInputStreamOperatorTestHarness.java    |  9 ++++
 .../table/planner/runtime/utils/TimeTestUtil.scala |  4 +-
 9 files changed, 178 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
index f8d0a74fa4d..9982d12c8e8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
@@ -76,6 +77,11 @@ public abstract class AbstractInput<IN, OUT> implements Input<IN>, KeyContextHan
         owner.internalSetKeyContextElement(record, stateKeySelector);
     }
 
+    @Override
+    public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
+        owner.processRecordAttributes(recordAttributes, inputId);
+    }
+
     @Override
     public boolean hasKeyContext() {
         return stateKeySelector != null;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 78fb35af4e0..6d02567e416 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -50,6 +51,8 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -61,6 +64,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Locale;
 import java.util.Optional;
 
@@ -148,6 +153,11 @@ public abstract class AbstractStreamOperator<OUT>
 
     protected transient ProcessingTimeService processingTimeService;
 
+    protected transient RecordAttributes lastRecordAttributes1 =
+            RecordAttributes.EMPTY_RECORD_ATTRIBUTES;
+    protected transient RecordAttributes lastRecordAttributes2 =
+            RecordAttributes.EMPTY_RECORD_ATTRIBUTES;
+
     // ------------------------------------------------------------------------
     //  Life Cycle
     // ------------------------------------------------------------------------
@@ -649,4 +659,28 @@ public abstract class AbstractStreamOperator<OUT>
     protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() {
         return Optional.ofNullable(timeServiceManager);
     }
+
+    @Experimental
+    public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
+        output.emitRecordAttributes(
+                new RecordAttributesBuilder(Collections.singletonList(recordAttributes)).build());
+    }
+
+    @Experimental
+    public void processRecordAttributes1(RecordAttributes recordAttributes) {
+        lastRecordAttributes1 = recordAttributes;
+        output.emitRecordAttributes(
+                new RecordAttributesBuilder(
+                                Arrays.asList(lastRecordAttributes1, lastRecordAttributes2))
+                        .build());
+    }
+
+    @Experimental
+    public void processRecordAttributes2(RecordAttributes recordAttributes) {
+        lastRecordAttributes2 = recordAttributes;
+        output.emitRecordAttributes(
+                new RecordAttributesBuilder(
+                                Arrays.asList(lastRecordAttributes1, lastRecordAttributes2))
+                        .build());
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index a10206ba0e5..128c2790c90 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -50,6 +50,8 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -60,6 +62,7 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Locale;
 import java.util.Optional;
 
@@ -97,6 +100,7 @@ public abstract class AbstractStreamOperatorV2<OUT>
 
     protected final LatencyStats latencyStats;
     protected final ProcessingTimeService processingTimeService;
+    protected final RecordAttributes[] lastRecordAttributes;
 
     private StreamOperatorStateHandler stateHandler;
     private InternalTimeServiceManager<?> timeServiceManager;
@@ -114,6 +118,10 @@ public abstract class AbstractStreamOperatorV2<OUT>
                         environment.getTaskManagerInfo().getConfiguration(),
                         parameters.getContainingTask().getIndexInSubtaskGroup());
         processingTimeService = Preconditions.checkNotNull(parameters.getProcessingTimeService());
+        lastRecordAttributes = new RecordAttributes[numberOfInputs];
+        for (int i = 0; i < numberOfInputs; ++i) {
+            lastRecordAttributes[i] = RecordAttributes.EMPTY_RECORD_ATTRIBUTES;
+        }
         executionConfig = parameters.getContainingTask().getExecutionConfig();
         userCodeClassLoader = parameters.getContainingTask().getUserCodeClassLoader();
         cancelables = parameters.getContainingTask().getCancelables();
@@ -496,6 +504,13 @@ public abstract class AbstractStreamOperatorV2<OUT>
         }
     }
 
+    public void processRecordAttributes(RecordAttributes recordAttributes, int inputId)
+            throws Exception {
+        lastRecordAttributes[inputId - 1] = recordAttributes;
+        output.emitRecordAttributes(
+                new RecordAttributesBuilder(Arrays.asList(lastRecordAttributes)).build());
+    }
+
     @Override
     public OperatorID getOperatorID() {
         return config.getOperatorID();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 6fccbe42752..afc52d609af 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -34,6 +34,8 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
@@ -51,6 +53,7 @@ import org.junit.Test;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -537,6 +540,62 @@ public class AbstractStreamOperatorTest {
         }
     }
 
+    @Test
+    public void testTwoInputsRecordAttributesForwarding() throws Exception {
+        final WatermarkTestingOperator testOperator = new WatermarkTestingOperator();
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+        KeySelector<Long, Integer> dummyKeySelector = l -> 0;
+        try (KeyedTwoInputStreamOperatorTestHarness<Integer, Long, Long, Long> testHarness =
+                new KeyedTwoInputStreamOperatorTestHarness<>(
+                        testOperator,
+                        dummyKeySelector,
+                        dummyKeySelector,
+                        BasicTypeInfo.INT_TYPE_INFO)) {
+            testHarness.setup();
+            testHarness.open();
+
+            final RecordAttributes backlogRecordAttributes =
+                    new RecordAttributesBuilder(Collections.emptyList()).setBacklog(true).build();
+            final RecordAttributes nonBacklogRecordAttributes =
+                    new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build();
+
+            testHarness.processRecordAttributes1(backlogRecordAttributes);
+            testHarness.processRecordAttributes2(backlogRecordAttributes);
+            expectedOutput.add(backlogRecordAttributes);
+            expectedOutput.add(backlogRecordAttributes);
+            TestHarnessUtil.assertOutputEquals(
+                    "Output was not correct", expectedOutput, testHarness.getOutput());
+            testHarness.processRecordAttributes1(nonBacklogRecordAttributes);
+            testHarness.processRecordAttributes2(nonBacklogRecordAttributes);
+            expectedOutput.add(backlogRecordAttributes);
+            expectedOutput.add(nonBacklogRecordAttributes);
+            TestHarnessUtil.assertOutputEquals(
+                    "Output was not correct", expectedOutput, testHarness.getOutput());
+        }
+    }
+
+    @Test
+    public void testOneInputRecordAttributesForwarding() throws Exception {
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
+                testHarness = createTestHarness()) {
+            testHarness.open();
+
+            final RecordAttributes backlogRecordAttributes =
+                    new RecordAttributesBuilder(Collections.emptyList()).setBacklog(true).build();
+            final RecordAttributes nonBacklogRecordAttributes =
+                    new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build();
+
+            testHarness.processRecordAttributes(backlogRecordAttributes);
+            testHarness.processRecordAttributes(nonBacklogRecordAttributes);
+            expectedOutput.add(backlogRecordAttributes);
+            expectedOutput.add(nonBacklogRecordAttributes);
+
+            TestHarnessUtil.assertOutputEquals(
+                    "Output was not correct", expectedOutput, testHarness.getOutput());
+        }
+    }
+
     /** Extracts the result values form the test harness and clear the output queue. */
     @SuppressWarnings({"unchecked", "rawtypes"})
     private <T> List<T> extractResult(OneInputStreamOperatorTestHarness<?, T> testHarness) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2Test.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2Test.java
index a93df77e611..c6f7a3d28ea 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2Test.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2Test.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.KeyedMultiInputStreamOperatorTestHarness;
@@ -131,6 +133,39 @@ public class AbstractStreamOperatorV2Test extends AbstractStreamOperatorTest {
         }
     }
 
+    @Test
+    public void testRecordAttributesForwarding() throws Exception {
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+        try (KeyedMultiInputStreamOperatorTestHarness<Integer, Long> testHarness =
+                new KeyedMultiInputStreamOperatorTestHarness<>(
+                        new WatermarkTestingOperatorFactory(), BasicTypeInfo.INT_TYPE_INFO)) {
+            testHarness.setup();
+            testHarness.open();
+
+            final RecordAttributes backlogRecordAttributes =
+                    new RecordAttributesBuilder(Collections.emptyList()).setBacklog(true).build();
+            final RecordAttributes nonBacklogRecordAttributes =
+                    new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build();
+
+            testHarness.processRecordAttributes(0, backlogRecordAttributes);
+            testHarness.processRecordAttributes(1, backlogRecordAttributes);
+            testHarness.processRecordAttributes(2, backlogRecordAttributes);
+            expectedOutput.add(backlogRecordAttributes);
+            expectedOutput.add(backlogRecordAttributes);
+            expectedOutput.add(backlogRecordAttributes);
+
+            testHarness.processRecordAttributes(0, nonBacklogRecordAttributes);
+            testHarness.processRecordAttributes(1, nonBacklogRecordAttributes);
+            testHarness.processRecordAttributes(2, nonBacklogRecordAttributes);
+            expectedOutput.add(backlogRecordAttributes);
+            expectedOutput.add(backlogRecordAttributes);
+            expectedOutput.add(nonBacklogRecordAttributes);
+
+            TestHarnessUtil.assertOutputEquals(
+                    "Output was not correct", expectedOutput, testHarness.getOutput());
+        }
+    }
+
     private static class WatermarkTestingOperatorFactory
             extends AbstractStreamOperatorFactory<Long> {
         @Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java
index 262f4b5faf1..454e0e60d5a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java
@@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.operators.Input;
 import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
@@ -63,6 +64,11 @@ public class MultiInputStreamOperatorTestHarness<OUT>
         getCastedOperator().getInputs().get(idx).processWatermarkStatus(watermarkStatus);
     }
 
+    public void processRecordAttributes(int idx, RecordAttributes recordAttributes)
+            throws Exception {
+        getCastedOperator().getInputs().get(idx).processRecordAttributes(recordAttributes);
+    }
+
     private MultipleInputStreamOperator<OUT> getCastedOperator() {
         return (MultipleInputStreamOperator<OUT>) operator;
     }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 3b92cf75f59..f7c86e3e65d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.Preconditions;
@@ -259,6 +260,16 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
         }
     }
 
+    public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
+        if (inputs.isEmpty()) {
+            getOneInputOperator().processRecordAttributes(recordAttributes);
+        } else {
+            checkState(inputs.size() == 1);
+            Input input = inputs.get(0);
+            input.processRecordAttributes(recordAttributes);
+        }
+    }
+
     public long getCurrentWatermark() {
         return currentWatermark;
     }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index e2addce9575..457a774211d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.util;
 
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
@@ -89,4 +90,12 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT>
     public void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception {
         twoInputOperator.processWatermarkStatus2(watermarkStatus);
     }
+
+    public void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {
+        twoInputOperator.processRecordAttributes1(recordAttributes);
+    }
+
+    public void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {
+        twoInputOperator.processRecordAttributes2(recordAttributes);
+    }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TimeTestUtil.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TimeTestUtil.scala
index b1ad73b7e21..8f22a3b494d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TimeTestUtil.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TimeTestUtil.scala
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, OneInputStreamOperator}
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.runtime.streamrecord.{RecordAttributes, StreamRecord}
 import org.apache.flink.table.planner.JLong
 
 object TimeTestUtil {
@@ -98,6 +98,8 @@ object TimeTestUtil {
       }
     }
 
+    override def processRecordAttributes(recordAttributes: RecordAttributes): Unit =
+      super.processRecordAttributes(recordAttributes)
   }
 
 }


(flink) 07/09: [FLINK-33810][runtime] Introduce and implement emitRecordAttributes method of PushingAsyncDataInput#DataOutput

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f92e2c8be845b6d2d07281d7538ac1f2468e8a67
Author: sxnan <su...@gmail.com>
AuthorDate: Mon Dec 11 17:01:57 2023 +0800

    [FLINK-33810][runtime] Introduce and implement emitRecordAttributes method of PushingAsyncDataInput#DataOutput
---
 .../connector/base/source/reader/SourceReaderBaseTest.java     |  4 ++++
 .../api/operators/sort/MultiInputSortingDataInput.java         |  8 ++++++++
 .../flink/streaming/api/operators/sort/SortingDataInput.java   |  7 +++++++
 .../apache/flink/streaming/runtime/io/FinishedDataOutput.java  |  6 ++++++
 .../flink/streaming/runtime/io/PushingAsyncDataInput.java      |  3 +++
 .../runtime/io/StreamMultipleInputProcessorFactory.java        |  6 ++++++
 .../streaming/runtime/io/StreamTwoInputProcessorFactory.java   | 10 ++++++++++
 .../flink/streaming/runtime/tasks/OneInputStreamTask.java      |  6 ++++++
 .../streaming/runtime/tasks/SourceOperatorStreamTask.java      |  6 ++++++
 .../streaming/api/operators/sort/CollectingDataOutput.java     |  6 ++++++
 .../streaming/api/operators/sort/CollectionDataInput.java      |  2 ++
 .../api/operators/sort/LargeSortingDataInputITCase.java        |  4 ++++
 .../streaming/api/operators/source/CollectingDataOutput.java   |  6 ++++++
 .../flink/streaming/runtime/io/StreamTaskNetworkInputTest.java |  4 ++++
 .../runtime/watermarkstatus/StatusWatermarkValveTest.java      |  6 ++++++
 .../apache/flink/streaming/util/SourceOperatorTestHarness.java |  6 ++++++
 16 files changed, 90 insertions(+)

diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index f5882d66797..517c624b480 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.source.event.AddSplitEvent;
 import org.apache.flink.streaming.api.operators.SourceOperator;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
@@ -561,5 +562,8 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
 
         @Override
         public void emitLatencyMarker(LatencyMarker latencyMarker) {}
+
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {}
     }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java
index e398028f787..9bfefc53e66 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java
@@ -42,6 +42,7 @@ import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
 import org.apache.flink.streaming.runtime.io.StreamTaskInput;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.ExceptionUtils;
@@ -425,6 +426,13 @@ public final class MultiInputSortingDataInput<IN, K> implements StreamTaskInput<
 
         @Override
         public void emitLatencyMarker(LatencyMarker latencyMarker) {}
+
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
+            // The MultiInputSortingDataInput is only used in batch execution mode. The
+            // RecordAttributes is not used in batch execution mode. We will ignore all the
+            // RecordAttributes.
+        }
     }
 
     /**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
index 088519a2dec..ba4b71c4ab0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
 import org.apache.flink.streaming.runtime.io.StreamTaskInput;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.ExceptionUtils;
@@ -183,6 +184,12 @@ public final class SortingDataInput<T, K> implements StreamTaskInput<T> {
 
         @Override
         public void emitLatencyMarker(LatencyMarker latencyMarker) {}
+
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) {
+            // The SortingDataInput is only used in batch execution mode. The RecordAttributes is
+            // not used in batch execution mode. We will ignore all the RecordAttributes.
+        }
     }
 
     @Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/FinishedDataOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/FinishedDataOutput.java
index 54769fa2f8a..60dff2e04f5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/FinishedDataOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/FinishedDataOutput.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
@@ -53,4 +54,9 @@ public class FinishedDataOutput<IN> implements PushingAsyncDataInput.DataOutput<
     public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
         LOG.debug("Unexpected latency marker after finish() received.");
     }
+
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
+        LOG.debug("Unexpected recordAttributes after finish() received.");
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/PushingAsyncDataInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/PushingAsyncDataInput.java
index 619c2d00954..63be0726dd5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/PushingAsyncDataInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/PushingAsyncDataInput.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.AvailabilityProvider;
 import org.apache.flink.runtime.io.PullingAsyncDataInput;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
@@ -55,5 +56,7 @@ public interface PushingAsyncDataInput<T> extends AvailabilityProvider {
         void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception;
 
         void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception;
+
+        void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception;
     }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
index ee504eb76cf..807f03060b3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
@@ -44,6 +44,7 @@ import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorChain;
 import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
@@ -289,6 +290,11 @@ public class StreamMultipleInputProcessorFactory {
         public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
             input.processLatencyMarker(latencyMarker);
         }
+
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
+            input.processRecordAttributes(recordAttributes);
+        }
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
index dbfc295ace0..c7463e4fded 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
@@ -40,6 +40,7 @@ import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorChain;
 import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
@@ -289,6 +290,15 @@ public class StreamTwoInputProcessorFactory {
                 operator.processLatencyMarker2(latencyMarker);
             }
         }
+
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
+            if (inputIndex == 0) {
+                operator.processRecordAttributes1(recordAttributes);
+            } else {
+                operator.processRecordAttributes2(recordAttributes);
+            }
+        }
     }
 
     private static class FinishedOnRestoreWatermarkBypass {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index bc3abb489ec..a9220c51a95 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -42,6 +42,7 @@ import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate
 import org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
@@ -252,5 +253,10 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
         public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
             operator.processLatencyMarker(latencyMarker);
         }
+
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
+            operator.processRecordAttributes(recordAttributes);
+        }
     }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index 70fa37fc1d0..c87ee6eead7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.runtime.io.StreamTaskInput;
 import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -314,6 +315,11 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
             output.emitLatencyMarker(latencyMarker);
         }
 
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) {
+            output.emitRecordAttributes(recordAttributes);
+        }
+
         @Override
         public void emitWatermark(Watermark watermark) {
             long watermarkTimestamp = watermark.getTimestamp();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/CollectingDataOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/CollectingDataOutput.java
index 1c5a9ae4fcf..ca8e061cf07 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/CollectingDataOutput.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/CollectingDataOutput.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.operators.sort;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
@@ -53,4 +54,9 @@ final class CollectingDataOutput<E> implements PushingAsyncDataInput.DataOutput<
     public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
         events.add(latencyMarker);
     }
+
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
+        events.add(recordAttributes);
+    }
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/CollectionDataInput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/CollectionDataInput.java
index 63be46754b7..a326c88036b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/CollectionDataInput.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/CollectionDataInput.java
@@ -53,6 +53,8 @@ final class CollectionDataInput<E> implements StreamTaskInput<E> {
                 output.emitRecord(streamElement.asRecord());
             } else if (streamElement instanceof Watermark) {
                 output.emitWatermark(streamElement.asWatermark());
+            } else if (streamElement.isRecordAttributes()) {
+                output.emitRecordAttributes(streamElement.asRecordAttributes());
             } else if (streamElement.isWatermarkStatus()) {
                 output.emitWatermarkStatus(streamElement.asWatermarkStatus());
             } else {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/LargeSortingDataInputITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/LargeSortingDataInputITCase.java
index 21aa798b12f..1d6a76547dc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/LargeSortingDataInputITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/LargeSortingDataInputITCase.java
@@ -42,6 +42,7 @@ import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
 import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
 import org.apache.flink.streaming.runtime.io.StreamTaskInput;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
@@ -219,6 +220,9 @@ public class LargeSortingDataInputITCase {
         @Override
         public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {}
 
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {}
+
         public int getSeenRecords() {
             return seenRecords;
         }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java
index 6a6c535b891..51cf3eb2401 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.operators.source;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
@@ -56,6 +57,11 @@ public final class CollectingDataOutput<E> implements PushingAsyncDataInput.Data
         events.add(latencyMarker);
     }
 
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
+        events.add(recordAttributes);
+    }
+
     public List<Object> getEvents() {
         return events;
     }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index 7338469171b..481bdaff082 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -49,6 +49,7 @@ import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate
 import org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler;
 import org.apache.flink.streaming.runtime.io.checkpointing.UpstreamRecoveryTracker;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -399,6 +400,9 @@ public class StreamTaskNetworkInputTest {
 
         @Override
         public void emitLatencyMarker(LatencyMarker latencyMarker) {}
+
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) {}
     }
 
     private static class VerifyRecordsDataOutput<T> extends NoOpDataOutput<T> {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValveTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValveTest.java
index 0db7b04e6c4..52bbc518541 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValveTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValveTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.watermarkstatus;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -453,6 +454,11 @@ class StatusWatermarkValveTest {
             throw new UnsupportedOperationException();
         }
 
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
         public StreamElement popLastSeenOutput() {
             return allOutputs.poll();
         }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceOperatorTestHarness.java
index 2dbfa280411..0509ab0bc74 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceOperatorTestHarness.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
@@ -79,5 +80,10 @@ public class SourceOperatorTestHarness<OUT> extends AbstractStreamOperatorTestHa
         public void emitLatencyMarker(LatencyMarker latencyMarker) {
             output.emitLatencyMarker(latencyMarker);
         }
+
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
+            output.emitRecordAttributes(recordAttributes);
+        }
     }
 }


(flink) 01/09: [hotfix][runtime] CollectionDataInput handles WatermarkStatus

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 72a0ed61085dfe5131fef88f2233e0f5ad9d323b
Author: sxnan <su...@gmail.com>
AuthorDate: Mon Dec 11 17:00:13 2023 +0800

    [hotfix][runtime] CollectionDataInput handles WatermarkStatus
---
 .../apache/flink/streaming/api/operators/sort/CollectionDataInput.java  | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/CollectionDataInput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/CollectionDataInput.java
index 2926c3acd05..63be46754b7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/CollectionDataInput.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/CollectionDataInput.java
@@ -53,6 +53,8 @@ final class CollectionDataInput<E> implements StreamTaskInput<E> {
                 output.emitRecord(streamElement.asRecord());
             } else if (streamElement instanceof Watermark) {
                 output.emitWatermark(streamElement.asWatermark());
+            } else if (streamElement.isWatermarkStatus()) {
+                output.emitWatermarkStatus(streamElement.asWatermarkStatus());
             } else {
                 throw new IllegalStateException("Unsupported element type: " + streamElement);
             }


(flink) 05/09: [FLINK-33810][runtime] Introduce emitRecordAttributes method to the Output

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 28c2d2925cae0e534632c3c0f926e661acd800fb
Author: sxnan <su...@gmail.com>
AuthorDate: Thu Nov 16 15:22:27 2023 +0800

    [FLINK-33810][runtime] Introduce emitRecordAttributes method to the Output
---
 .../org/apache/flink/state/api/output/BoundedStreamTask.java  |  4 ++++
 .../api/output/operators/StateBootstrapWrapperOperator.java   |  4 ++++
 .../apache/flink/streaming/api/operators/CountingOutput.java  |  6 ++++++
 .../java/org/apache/flink/streaming/api/operators/Output.java |  9 +++++++++
 .../flink/streaming/api/operators/TimestampedCollector.java   |  6 ++++++
 .../apache/flink/streaming/runtime/io/RecordWriterOutput.java | 11 +++++++++++
 .../streaming/runtime/tasks/BroadcastingOutputCollector.java  |  8 ++++++++
 .../apache/flink/streaming/runtime/tasks/ChainingOutput.java  | 10 ++++++++++
 .../runtime/tasks/FinishedOnRestoreMainOperatorOutput.java    |  8 ++++++++
 .../flink/streaming/runtime/tasks/StreamIterationTail.java    |  4 ++++
 .../streaming/util/AbstractStreamOperatorTestHarness.java     |  6 ++++++
 .../java/org/apache/flink/streaming/util/CollectorOutput.java |  6 ++++++
 .../test/java/org/apache/flink/streaming/util/MockOutput.java |  6 ++++++
 .../operators/multipleinput/output/BroadcastingOutput.java    |  8 ++++++++
 .../CopyingSecondInputOfTwoInputStreamOperatorOutput.java     | 10 ++++++++++
 .../output/FirstInputOfTwoInputStreamOperatorOutput.java      | 10 ++++++++++
 .../multipleinput/output/OneInputStreamOperatorOutput.java    | 10 ++++++++++
 .../output/SecondInputOfTwoInputStreamOperatorOutput.java     | 10 ++++++++++
 .../operators/multipleinput/output/BlackHoleOutput.java       |  6 ++++++
 .../operators/over/NonBufferOverWindowOperatorTest.java       |  6 ++++++
 20 files changed, 148 insertions(+)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
index c1b35bed4c5..cef31b5478b 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -145,6 +146,9 @@ class BoundedStreamTask<IN, OUT, OP extends OneInputStreamOperator<IN, OUT> & Bo
         @Override
         public void emitLatencyMarker(LatencyMarker latencyMarker) {}
 
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) {}
+
         @Override
         public void collect(StreamRecord<OUT> record) {
             inner.collect(record.getValue());
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
index 933602abb2d..ccbf588b81c 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
@@ -38,6 +38,7 @@ import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
@@ -216,6 +217,9 @@ public final class StateBootstrapWrapperOperator<
         @Override
         public void emitLatencyMarker(LatencyMarker latencyMarker) {}
 
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) {}
+
         @Override
         public void collect(T record) {}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CountingOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CountingOutput.java
index 96028c139a6..824f8ebebca 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CountingOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CountingOutput.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
@@ -74,4 +75,9 @@ public class CountingOutput<OUT> implements WatermarkGaugeExposingOutput<StreamR
     public Gauge<Long> getWatermarkGauge() {
         return output.getWatermarkGauge();
     }
+
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) {
+        output.emitRecordAttributes(recordAttributes);
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
index cdbeff8a6b1..c621a1e7896 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.Collector;
@@ -55,4 +57,11 @@ public interface Output<T> extends Collector<T> {
     <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record);
 
     void emitLatencyMarker(LatencyMarker latencyMarker);
+
+    /**
+     * Emits a {@link RecordAttributes} from an operator. This element is broadcast to all
+     * downstream operators.
+     */
+    @Experimental
+    void emitRecordAttributes(RecordAttributes recordAttributes);
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
index 7e2e38e2484..79438e64776 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
@@ -91,4 +92,9 @@ public final class TimestampedCollector<T> implements Output<T> {
     public void emitLatencyMarker(LatencyMarker latencyMarker) {
         output.emitLatencyMarker(latencyMarker);
     }
+
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) {
+        output.emitRecordAttributes(recordAttributes);
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 43b2d8400b8..66a8fef1c02 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -211,4 +212,14 @@ public class RecordWriterOutput<OUT>
     public Gauge<Long> getWatermarkGauge() {
         return watermarkGauge;
     }
+
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) {
+        try {
+            serializationDelegate.setInstance(recordAttributes);
+            recordWriter.broadcastEmit(serializationDelegate);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java
index 07f9a4d7677..0f4613046ef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
@@ -103,4 +104,11 @@ class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<Str
             output.close();
         }
     }
+
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) {
+        for (OutputWithChainingCheck<StreamRecord<T>> output : outputs) {
+            output.emitRecordAttributes(recordAttributes);
+        }
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
index 13c3bc6a9b6..1ffe6cfb14b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
@@ -154,4 +155,13 @@ class ChainingOutput<T>
             }
         }
     }
+
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) {
+        try {
+            input.processRecordAttributes(recordAttributes);
+        } catch (Exception e) {
+            throw new ExceptionInChainedOperatorException(e);
+        }
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOnRestoreMainOperatorOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOnRestoreMainOperatorOutput.java
index e3d23da2b24..603d525017e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOnRestoreMainOperatorOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOnRestoreMainOperatorOutput.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
@@ -69,6 +70,13 @@ public class FinishedOnRestoreMainOperatorOutput<OUT> implements WatermarkGaugeE
         throw new IllegalStateException();
     }
 
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) {
+        for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+            streamOutput.emitRecordAttributes(recordAttributes);
+        }
+    }
+
     @Override
     public void close() {}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index a9b54f01de0..928ef2ac946 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
@@ -129,6 +130,9 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
         @Override
         public void emitLatencyMarker(LatencyMarker latencyMarker) {}
 
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) {}
+
         @Override
         public void collect(StreamRecord<IN> record) {
             try {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index af4ac3d3986..0d827a16238 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -72,6 +72,7 @@ import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -840,6 +841,11 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
             outputList.add(latencyMarker);
         }
 
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) {
+            outputList.add(recordAttributes);
+        }
+
         @Override
         public void collect(StreamRecord<OUT> element) {
             if (outputSerializer == null) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
index a768427a9bb..2e6ac351aa5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.util;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
@@ -54,6 +55,11 @@ public class CollectorOutput<T> implements Output<StreamRecord<T>> {
         list.add(latencyMarker);
     }
 
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) {
+        list.add(recordAttributes);
+    }
+
     @Override
     public void collect(StreamRecord<T> record) {
         try {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
index e1e18dd135a..787a5fdda17 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.util;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.InstantiationUtil;
@@ -69,6 +70,11 @@ public class MockOutput<T> implements Output<StreamRecord<T>> {
         throw new RuntimeException();
     }
 
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) {
+        throw new RuntimeException("RecordAttributes is not supported for MockOutput");
+    }
+
     @Override
     public void close() {}
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/BroadcastingOutput.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/BroadcastingOutput.java
index 6d158dda1ff..166dfd16e06 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/BroadcastingOutput.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/BroadcastingOutput.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.runtime.operators.multipleinput.output;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorChain;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
@@ -90,4 +91,11 @@ public class BroadcastingOutput implements Output<StreamRecord<RowData>> {
             output.close();
         }
     }
+
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) {
+        for (Output<StreamRecord<RowData>> output : outputs) {
+            output.emitRecordAttributes(recordAttributes);
+        }
+    }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/CopyingSecondInputOfTwoInputStreamOperatorOutput.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/CopyingSecondInputOfTwoInputStreamOperatorOutput.java
index f404217de77..f143a50b6ef 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/CopyingSecondInputOfTwoInputStreamOperatorOutput.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/CopyingSecondInputOfTwoInputStreamOperatorOutput.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.table.data.RowData;
@@ -72,6 +73,15 @@ public class CopyingSecondInputOfTwoInputStreamOperatorOutput extends OutputBase
         }
     }
 
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) {
+        try {
+            operator.processRecordAttributes2(recordAttributes);
+        } catch (Exception e) {
+            throw new ExceptionInMultipleInputOperatorException(e);
+        }
+    }
+
     @Override
     public void collect(StreamRecord<RowData> record) {
         pushToOperator(record);
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/FirstInputOfTwoInputStreamOperatorOutput.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/FirstInputOfTwoInputStreamOperatorOutput.java
index 7a30acc2723..37ebdbbe771 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/FirstInputOfTwoInputStreamOperatorOutput.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/FirstInputOfTwoInputStreamOperatorOutput.java
@@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.table.data.RowData;
@@ -68,6 +69,15 @@ public class FirstInputOfTwoInputStreamOperatorOutput extends OutputBase {
         }
     }
 
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) {
+        try {
+            operator.processRecordAttributes1(recordAttributes);
+        } catch (Exception e) {
+            throw new ExceptionInMultipleInputOperatorException(e);
+        }
+    }
+
     @Override
     public void collect(StreamRecord<RowData> record) {
         pushToOperator(record);
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/OneInputStreamOperatorOutput.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/OneInputStreamOperatorOutput.java
index 56cd08f5515..6b80d80c5ec 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/OneInputStreamOperatorOutput.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/OneInputStreamOperatorOutput.java
@@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.table.data.RowData;
@@ -67,6 +68,15 @@ public class OneInputStreamOperatorOutput extends OutputBase {
         }
     }
 
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) {
+        try {
+            operator.processRecordAttributes(recordAttributes);
+        } catch (Exception e) {
+            throw new ExceptionInMultipleInputOperatorException(e);
+        }
+    }
+
     @Override
     public void collect(StreamRecord<RowData> record) {
         pushToOperator(record);
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/SecondInputOfTwoInputStreamOperatorOutput.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/SecondInputOfTwoInputStreamOperatorOutput.java
index db77a8f8004..6af43347880 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/SecondInputOfTwoInputStreamOperatorOutput.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/output/SecondInputOfTwoInputStreamOperatorOutput.java
@@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.table.data.RowData;
@@ -68,6 +69,15 @@ public class SecondInputOfTwoInputStreamOperatorOutput extends OutputBase {
         }
     }
 
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) {
+        try {
+            operator.processRecordAttributes2(recordAttributes);
+        } catch (Exception e) {
+            throw new ExceptionInMultipleInputOperatorException(e);
+        }
+    }
+
     @Override
     public void collect(StreamRecord<RowData> record) {
         pushToOperator(record);
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/output/BlackHoleOutput.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/output/BlackHoleOutput.java
index 2e52426c4d8..aef9777f82b 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/output/BlackHoleOutput.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/output/BlackHoleOutput.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.runtime.operators.multipleinput.output;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.table.data.RowData;
@@ -49,6 +50,11 @@ public class BlackHoleOutput implements Output<StreamRecord<RowData>> {
         // do nothing
     }
 
+    @Override
+    public void emitRecordAttributes(RecordAttributes recordAttributes) {
+        // do nothing
+    }
+
     @Override
     public void collect(StreamRecord<RowData> record) {
         // do nothing
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/NonBufferOverWindowOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/NonBufferOverWindowOperatorTest.java
index 14daa4dea0b..3431c178178 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/NonBufferOverWindowOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/NonBufferOverWindowOperatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
@@ -199,6 +200,11 @@ public class NonBufferOverWindowOperatorTest {
             throw new RuntimeException();
         }
 
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) {
+            throw new RuntimeException();
+        }
+
         @Override
         public void collect(StreamRecord<RowData> record) {
             consumer.accept(record.getValue());


(flink) 03/09: [FLINK-33810][runtime] Introduce RecordAttributes

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 42df7ebc8c1d7d589a9e9416d9000fd07015599d
Author: sxnan <su...@gmail.com>
AuthorDate: Thu Nov 16 15:15:26 2023 +0800

    [FLINK-33810][runtime] Introduce RecordAttributes
---
 .../runtime/streamrecord/RecordAttributes.java     | 71 ++++++++++++++++++++
 .../streamrecord/RecordAttributesBuilder.java      | 78 ++++++++++++++++++++++
 .../runtime/streamrecord/StreamElement.java        | 22 +++++-
 .../streamrecord/StreamElementSerializer.java      | 10 +++
 .../streamrecord/StreamElementSerializerTest.java  |  5 ++
 5 files changed, 185 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java
new file mode 100644
index 00000000000..7c0335ea4fe
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.Collections;
+import java.util.Objects;
+
+/**
+ * A RecordAttributes describes the attributes of records from the current RecordAttributes until
+ * the next one is received. It provides stream task with information that can be used to optimize
+ * the stream task's performance.
+ */
+@Experimental
+public class RecordAttributes extends StreamElement {
+
+    public static final RecordAttributes EMPTY_RECORD_ATTRIBUTES =
+            new RecordAttributesBuilder(Collections.emptyList()).build();
+    private final boolean isBacklog;
+
+    public RecordAttributes(boolean isBacklog) {
+        this.isBacklog = isBacklog;
+    }
+
+    /**
+     * If it returns true, then the records received after this element are stale and an operator
+     * can optionally buffer records until isBacklog=false. This allows an operator to optimize
+     * throughput at the cost of processing latency.
+     */
+    public boolean isBacklog() {
+        return isBacklog;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RecordAttributes that = (RecordAttributes) o;
+        return isBacklog == that.isBacklog;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(isBacklog);
+    }
+
+    @Override
+    public String toString() {
+        return "RecordAttributes{" + "backlog=" + isBacklog + '}';
+    }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributesBuilder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributesBuilder.java
new file mode 100644
index 00000000000..74281a48fbf
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributesBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.apache.flink.annotation.Experimental;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** The builder class for {@link RecordAttributes}. */
+@Experimental
+public class RecordAttributesBuilder {
+    private static final Logger LOG = LoggerFactory.getLogger(RecordAttributesBuilder.class);
+    private final List<RecordAttributes> lastRecordAttributesOfInputs;
+    @Nullable private Boolean isBacklog = null;
+
+    /**
+     * This constructor takes a list of the last RecordAttributes received from each of the
+     * operator's inputs. Each input is corresponding to an input edge of the job graph. When this
+     * list is not empty, it will be used to determine the default values for those attributes that
+     * have not been explicitly set by caller.
+     */
+    public RecordAttributesBuilder(List<RecordAttributes> lastRecordAttributesOfInputs) {
+        this.lastRecordAttributesOfInputs = lastRecordAttributesOfInputs;
+    }
+
+    public RecordAttributesBuilder setBacklog(boolean isBacklog) {
+        this.isBacklog = isBacklog;
+        return this;
+    }
+
+    /**
+     * If any operator attribute is null, we will log it at DEBUG level and determine a non-null
+     * default value as described below.
+     *
+     * <p>Default value for backlog: if any element in lastRecordAttributesOfInputs has
+     * backlog=true, use true. Otherwise, use false.
+     */
+    public RecordAttributes build() {
+        if (isBacklog == null) {
+            final boolean defaultBacklog = getDefaultBacklog();
+            LOG.debug(
+                    "backlog is not set, set to {} from the last record attributes {}.",
+                    defaultBacklog,
+                    lastRecordAttributesOfInputs);
+            isBacklog = defaultBacklog;
+        }
+        return new RecordAttributes(isBacklog);
+    }
+
+    private boolean getDefaultBacklog() {
+        for (RecordAttributes lastAttributes : lastRecordAttributesOfInputs) {
+            if (lastAttributes.isBacklog()) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
index 3fbcf2e42dc..c65b0b5bf7b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
-/** An element in a data stream. Can be a record or a Watermark. */
+/** An element in a data stream. Can be a record, a Watermark, or a RecordAttributes. */
 @Internal
 public abstract class StreamElement {
 
@@ -62,6 +62,15 @@ public abstract class StreamElement {
         return getClass() == LatencyMarker.class;
     }
 
+    /**
+     * Check whether this element is record attributes.
+     *
+     * @return True, if this element is record attributes, false otherwise.
+     */
+    public final boolean isRecordAttributes() {
+        return getClass() == RecordAttributes.class;
+    }
+
     /**
      * Casts this element into a StreamRecord.
      *
@@ -103,4 +112,15 @@ public abstract class StreamElement {
     public final LatencyMarker asLatencyMarker() {
         return (LatencyMarker) this;
     }
+
+    /**
+     * Casts this element into a RecordAttributes.
+     *
+     * @return This element as a RecordAttributes.
+     * @throws java.lang.ClassCastException Thrown, if this element is actually not a
+     *     RecordAttributes.
+     */
+    public final RecordAttributes asRecordAttributes() {
+        return (RecordAttributes) this;
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index b1052131769..4140c52a9ec 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -50,6 +50,7 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
     private static final int TAG_WATERMARK = 2;
     private static final int TAG_LATENCY_MARKER = 3;
     private static final int TAG_STREAM_STATUS = 4;
+    private static final int TAG_RECORD_ATTRIBUTES = 5;
 
     private final TypeSerializer<T> typeSerializer;
 
@@ -146,6 +147,8 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
             target.writeLong(source.readLong());
             target.writeLong(source.readLong());
             target.writeInt(source.readInt());
+        } else if (tag == TAG_RECORD_ATTRIBUTES) {
+            target.writeBoolean(source.readBoolean());
         } else {
             throw new IOException("Corrupt stream, found tag: " + tag);
         }
@@ -175,6 +178,9 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
             target.writeLong(value.asLatencyMarker().getOperatorId().getLowerPart());
             target.writeLong(value.asLatencyMarker().getOperatorId().getUpperPart());
             target.writeInt(value.asLatencyMarker().getSubtaskIndex());
+        } else if (value.isRecordAttributes()) {
+            target.write(TAG_RECORD_ATTRIBUTES);
+            target.writeBoolean(value.asRecordAttributes().isBacklog());
         } else {
             throw new RuntimeException();
         }
@@ -197,6 +203,8 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
                     source.readLong(),
                     new OperatorID(source.readLong(), source.readLong()),
                     source.readInt());
+        } else if (tag == TAG_RECORD_ATTRIBUTES) {
+            return new RecordAttributes(source.readBoolean());
         } else {
             throw new IOException("Corrupt stream, found tag: " + tag);
         }
@@ -223,6 +231,8 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
                     source.readLong(),
                     new OperatorID(source.readLong(), source.readLong()),
                     source.readInt());
+        } else if (tag == TAG_RECORD_ATTRIBUTES) {
+            return new RecordAttributes(source.readBoolean());
         } else {
             throw new IOException("Corrupt stream, found tag: " + tag);
         }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
index dc441328856..a328f5e9d03 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -92,6 +93,10 @@ public class StreamElementSerializerTest {
         LatencyMarker latencyMarker =
                 new LatencyMarker(System.currentTimeMillis(), new OperatorID(-1, -1), 1);
         assertEquals(latencyMarker, serializeAndDeserialize(latencyMarker, serializer));
+
+        RecordAttributes recordAttributes =
+                new RecordAttributesBuilder(Collections.emptyList()).setBacklog(true).build();
+        assertEquals(recordAttributes, serializeAndDeserialize(recordAttributes, serializer));
     }
 
     @SuppressWarnings("unchecked")