You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "reswqa (via GitHub)" <gi...@apache.org> on 2023/04/20 09:33:48 UTC

[GitHub] [flink] reswqa opened a new pull request, #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

reswqa opened a new pull request, #22432:
URL: https://github.com/apache/flink/pull/22432

   ## What is the purpose of the change
   
   *If an operator has nonChainedOutputs, then its corresponding output should be counted in the task-level numRecordsOut metric.*
   
   This pull request is borrowed from `https://github.com/apache/flink/pull/13109`.
   
   
   ## Brief change log
   
     - *Include side outputs in numRecordsOut metric*
   
   ## Verifying this change
   
   This change added unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): yes
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1178599478


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java:
##########
@@ -17,52 +17,77 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.XORShiftRandom;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
 
-    protected final Output<StreamRecord<T>>[] outputs;
+    protected final Output<StreamRecord<T>>[] allOutputs;
+
+    protected final Output<StreamRecord<T>>[] chainedOutputs;
+
+    protected final RecordWriterOutput<T>[] nonChainedOutputs;
+
+    protected final Counter numRecordsOutForTask;
     private final Random random = new XORShiftRandom();
     private final WatermarkGauge watermarkGauge = new WatermarkGauge();
 
-    public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
-        this.outputs = outputs;
+    @SuppressWarnings({"unchecked"})
+    public BroadcastingOutputCollector(
+            Output<StreamRecord<T>>[] allOutputs, Counter numRecordsOutForTask) {
+        this.allOutputs = allOutputs;
+        this.numRecordsOutForTask = numRecordsOutForTask;
+
+        List<Output<StreamRecord<T>>> chainedOutputs = new ArrayList<>(4);
+        List<RecordWriterOutput<T>> nonChainedOutputs = new ArrayList<>(4);
+        for (Output<StreamRecord<T>> output : allOutputs) {
+            if (output instanceof RecordWriterOutput) {
+                nonChainedOutputs.add((RecordWriterOutput<T>) output);
+            } else {
+                chainedOutputs.add(output);
+            }
+        }
+        this.chainedOutputs = chainedOutputs.toArray(new Output[0]);
+        this.nonChainedOutputs = nonChainedOutputs.toArray(new RecordWriterOutput[0]);

Review Comment:
   Good point πŸ‘ , I have updated this pull request according to this.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java:
##########
@@ -17,52 +17,77 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.XORShiftRandom;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
 
-    protected final Output<StreamRecord<T>>[] outputs;
+    protected final Output<StreamRecord<T>>[] allOutputs;
+
+    protected final Output<StreamRecord<T>>[] chainedOutputs;
+
+    protected final RecordWriterOutput<T>[] nonChainedOutputs;
+
+    protected final Counter numRecordsOutForTask;
     private final Random random = new XORShiftRandom();
     private final WatermarkGauge watermarkGauge = new WatermarkGauge();
 
-    public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
-        this.outputs = outputs;
+    @SuppressWarnings({"unchecked"})
+    public BroadcastingOutputCollector(
+            Output<StreamRecord<T>>[] allOutputs, Counter numRecordsOutForTask) {
+        this.allOutputs = allOutputs;
+        this.numRecordsOutForTask = numRecordsOutForTask;
+
+        List<Output<StreamRecord<T>>> chainedOutputs = new ArrayList<>(4);
+        List<RecordWriterOutput<T>> nonChainedOutputs = new ArrayList<>(4);
+        for (Output<StreamRecord<T>> output : allOutputs) {
+            if (output instanceof RecordWriterOutput) {
+                nonChainedOutputs.add((RecordWriterOutput<T>) output);
+            } else {
+                chainedOutputs.add(output);
+            }
+        }
+        this.chainedOutputs = chainedOutputs.toArray(new Output[0]);
+        this.nonChainedOutputs = nonChainedOutputs.toArray(new RecordWriterOutput[0]);

Review Comment:
   Good point πŸ‘, I have updated this pull request according to this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22432:
URL: https://github.com/apache/flink/pull/22432#issuecomment-1521565404

   Thanks @pnowojski for the review! I have addressed your second comment this pr in the fix-up commit. As for the first comment, I need to hear your feedback. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "pnowojski (via GitHub)" <gi...@apache.org>.
pnowojski commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1189477827


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##########
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
         }
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OperatorID(), new OddEvenOperator())
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new DuplicatingOperator()))
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .finish()
+                        .setTaskMetricGroup(taskMetricGroup)
+                        .build()) {
+            Counter numRecordsInCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+            Counter numRecordsOutCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+            final int numEvenRecords = 5;
+            final int numOddRecords = 3;
+
+            for (int x = 0; x < numEvenRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x));
+            }
+
+            for (int x = 0; x < numOddRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x + 1));
+            }
+            assertEquals(numOddRecords + numEvenRecords, numRecordsInCounter.getCount());
+            assertEquals(
+                    numOddRecords
+                            + (numOddRecords + numEvenRecords)
+                            + (numOddRecords + numEvenRecords) * 2,
+                    numRecordsOutCounter.getCount());

Review Comment:
   Thanks for the explanation. I think I understand it. One thing is not clear to me.
   
   > The first operator does not have NonChainedOutput
   
   So what is going on with the first operator πŸ€”? That first operator is trying to emit records with tags, but tags are only supported in non-chained outputs. First operator doesn't have any, and as you pointed out, `ChainedOutput` is ignoring records with tags. So the `if/else` branch in the first operator's `processElement` is effectively a dead code?
   
   If the above is correct, is this construct officially supported behaviour of Flink? Can user create such pipelines? If not, I would either remove the first operator, or replace it with just a no-op/pass-through operator that just forwards input records.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1188900545


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##########
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
         }
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OperatorID(), new OddEvenOperator())
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)

Review Comment:
   When writing this test case, I felt a bit like just considering one of the two tags seemed sufficient. In this test case, no matter how the data is sent, only one `OutputTag` (`Odd` or `Even`) will actually be hit, which should be a dual relationship and will not affect the coverage of this test. πŸ€” 
   
   What I actually want to consider here is the scenario where both `RecordWriter Without Tag` and `RecordWriter With Tag` exist, which is also the actual topology of this case. For more details, please refer to the picture attached in the other comments.
   
   Of course, if you think it is necessary to cover all tags here, I think it also makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1188891342


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##########
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
         }
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OperatorID(), new OddEvenOperator())
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new DuplicatingOperator()))
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .finish()
+                        .setTaskMetricGroup(taskMetricGroup)
+                        .build()) {
+            Counter numRecordsInCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+            Counter numRecordsOutCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+            final int numEvenRecords = 5;
+            final int numOddRecords = 3;
+
+            for (int x = 0; x < numEvenRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x));
+            }
+
+            for (int x = 0; x < numOddRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x + 1));
+            }
+            assertEquals(numOddRecords + numEvenRecords, numRecordsInCounter.getCount());
+            assertEquals(
+                    numOddRecords
+                            + (numOddRecords + numEvenRecords)
+                            + (numOddRecords + numEvenRecords) * 2,
+                    numRecordsOutCounter.getCount());

Review Comment:
   The topology graph of this test case is as follows:
   ![image](https://github.com/apache/flink/assets/19502505/cdae0246-da53-4860-b3c8-9f2d62dfe559)
   - The first operator does not have `NonChainedOutput`.
   - The second operator have two `RecordWriter With Odd Tag` as this code:`.addNonChainedOutputsCount(new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)`
   
   The assertion value(`numOddRecords + (numOddRecords + numEvenRecords) + (numOddRecords + numEvenRecords) * 2`) from the following three parts:
   
   - `numOddRecords`  from the second `OddEvenOperator`'s two `RecordWriterOutput With Odd Tag`. It is only calculated once here, which is guaranteed by the logic of `BroadcastingOutputCollector`.
   - `numOddRecords + numEvenRecords` from the second `OddEvenOperator`'s `RecordWriterOutput`.
   - `(numOddRecords + numEvenRecords) * 2` from the `DuplicatingOperator`
   
   It should be noted here that:
   - `OddEvenOperator` will send duplicated data to the side output and normal output. So we have the first two parts above. 
   - `ChainedOutput` will ignore the data with output tag as we don't set `outputTag` for chaining `StreamEdge`. So subsequent operators will not receive duplicate data.
   ```
        public void processElement(StreamRecord<Integer> element) {
               if (element.getValue() % 2 == 0) {
                   output.collect(evenOutputTag, element);
               } else {
                   output.collect(oddOutputTag, element);
               }
               output.collect(element);
        }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "pnowojski (via GitHub)" <gi...@apache.org>.
pnowojski commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1173917579


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingBroadcastingOutputCollector.java:
##########
@@ -27,37 +29,57 @@
  */
 final class CopyingBroadcastingOutputCollector<T> extends BroadcastingOutputCollector<T> {
 
-    public CopyingBroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
-        super(outputs);
+    public CopyingBroadcastingOutputCollector(
+            Output<StreamRecord<T>>[] allOutputs, Counter numRecordsOutForTask) {
+        super(allOutputs, numRecordsOutForTask);
     }
 
     @Override
     public void collect(StreamRecord<T> record) {
+        boolean emitted = false;
+        for (int i = 0; i < nonChainedOutputs.length - 1; i++) {
+            RecordWriterOutput<T> output = nonChainedOutputs[i];
+            StreamRecord<T> shallowCopy = record.copy(record.getValue());
+            emitted |= output.collectAndCheckIfEmitted(shallowCopy);
+        }
 
-        for (int i = 0; i < outputs.length - 1; i++) {
-            Output<StreamRecord<T>> output = outputs[i];
+        if (chainedOutputs.length == 0 && nonChainedOutputs.length > 0) {
+            emitted |=
+                    nonChainedOutputs[nonChainedOutputs.length - 1].collectAndCheckIfEmitted(
+                            record);
+        } else if (nonChainedOutputs.length > 0) {
             StreamRecord<T> shallowCopy = record.copy(record.getValue());
-            output.collect(shallowCopy);
+            emitted |=
+                    nonChainedOutputs[nonChainedOutputs.length - 1].collectAndCheckIfEmitted(
+                            shallowCopy);
         }
 
-        if (outputs.length > 0) {
-            // don't copy for the last output
-            outputs[outputs.length - 1].collect(record);
+        if (chainedOutputs.length > 0) {
+            for (int i = 0; i < chainedOutputs.length - 1; i++) {
+                Output<StreamRecord<T>> output = chainedOutputs[i];
+                StreamRecord<T> shallowCopy = record.copy(record.getValue());
+                output.collect(shallowCopy);
+            }
+            chainedOutputs[chainedOutputs.length - 1].collect(record);
+        }
+
+        if (emitted) {
+            numRecordsOutForTask.inc();
         }
     }
 
     @Override
     public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
-        for (int i = 0; i < outputs.length - 1; i++) {
-            Output<StreamRecord<T>> output = outputs[i];
+        for (int i = 0; i < allOutputs.length - 1; i++) {
+            Output<StreamRecord<T>> output = allOutputs[i];
 
             StreamRecord<X> shallowCopy = record.copy(record.getValue());
             output.collect(outputTag, shallowCopy);
         }
 
-        if (outputs.length > 0) {
+        if (allOutputs.length > 0) {
             // don't copy for the last output
-            outputs[outputs.length - 1].collect(outputTag, record);
+            allOutputs[allOutputs.length - 1].collect(outputTag, record);
         }
     }
 }

Review Comment:
   Shouldn't this be also changed?
   
   Doesn't it mean we are missing unit tests for the this?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java:
##########
@@ -17,52 +17,77 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.XORShiftRandom;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
 
-    protected final Output<StreamRecord<T>>[] outputs;
+    protected final Output<StreamRecord<T>>[] allOutputs;
+
+    protected final Output<StreamRecord<T>>[] chainedOutputs;
+
+    protected final RecordWriterOutput<T>[] nonChainedOutputs;
+
+    protected final Counter numRecordsOutForTask;
     private final Random random = new XORShiftRandom();
     private final WatermarkGauge watermarkGauge = new WatermarkGauge();
 
-    public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
-        this.outputs = outputs;
+    @SuppressWarnings({"unchecked"})
+    public BroadcastingOutputCollector(
+            Output<StreamRecord<T>>[] allOutputs, Counter numRecordsOutForTask) {
+        this.allOutputs = allOutputs;
+        this.numRecordsOutForTask = numRecordsOutForTask;
+
+        List<Output<StreamRecord<T>>> chainedOutputs = new ArrayList<>(4);
+        List<RecordWriterOutput<T>> nonChainedOutputs = new ArrayList<>(4);
+        for (Output<StreamRecord<T>> output : allOutputs) {
+            if (output instanceof RecordWriterOutput) {
+                nonChainedOutputs.add((RecordWriterOutput<T>) output);
+            } else {
+                chainedOutputs.add(output);
+            }
+        }
+        this.chainedOutputs = chainedOutputs.toArray(new Output[0]);
+        this.nonChainedOutputs = nonChainedOutputs.toArray(new RecordWriterOutput[0]);

Review Comment:
   Could we move this logic of splitting into separate lists up, for example to the `OperatorChain`? Here `instanceof` check might be fragile in the future, while in the `OperatorChain#createOutputCollector`  it looks like we have this information at hand when we iterate over `operatorConfig`.
   
   Actually, wouldn't it simplify the logic, especially in `CopyingBroadcastingOutputCollector`, if replaced `chainedOutputs` and `nonChainedOutputs` arrays, with a single array `boolean[] isOutputChained` (also constructed at the `OperatorChain` level? Then Instead of iterating over two arrays one after another, you could easily handle everything in the one loop:
   
   ```
   for (int i = 0; i < outputs.length; i++) {
       ...
       if (isOutputChained[i]) {
           ...
       }
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1189760303


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##########
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
         }
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OperatorID(), new OddEvenOperator())
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new DuplicatingOperator()))
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .finish()
+                        .setTaskMetricGroup(taskMetricGroup)
+                        .build()) {
+            Counter numRecordsInCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+            Counter numRecordsOutCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+            final int numEvenRecords = 5;
+            final int numOddRecords = 3;
+
+            for (int x = 0; x < numEvenRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x));
+            }
+
+            for (int x = 0; x < numOddRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x + 1));
+            }
+            assertEquals(numOddRecords + numEvenRecords, numRecordsInCounter.getCount());
+            assertEquals(
+                    numOddRecords
+                            + (numOddRecords + numEvenRecords)
+                            + (numOddRecords + numEvenRecords) * 2,
+                    numRecordsOutCounter.getCount());

Review Comment:
   > If the above is correct, is this construct officially supported behaviour of Flink? Can user create such pipelines? 
   Fair enough! This is indeed not a conventional pipeline, I have replaced the first operator by a `PassThroughOperator`.
   



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##########
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
         }
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OperatorID(), new OddEvenOperator())
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new DuplicatingOperator()))
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .finish()
+                        .setTaskMetricGroup(taskMetricGroup)
+                        .build()) {
+            Counter numRecordsInCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+            Counter numRecordsOutCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+            final int numEvenRecords = 5;
+            final int numOddRecords = 3;
+
+            for (int x = 0; x < numEvenRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x));
+            }
+
+            for (int x = 0; x < numOddRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x + 1));
+            }
+            assertEquals(numOddRecords + numEvenRecords, numRecordsInCounter.getCount());
+            assertEquals(
+                    numOddRecords
+                            + (numOddRecords + numEvenRecords)
+                            + (numOddRecords + numEvenRecords) * 2,
+                    numRecordsOutCounter.getCount());

Review Comment:
   > If the above is correct, is this construct officially supported behaviour of Flink? Can user create such pipelines? 
   
   Fair enough! This is indeed not a conventional pipeline, I have replaced the first operator by a `PassThroughOperator`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22432:
URL: https://github.com/apache/flink/pull/22432#issuecomment-1542165300

   Thanks @pnowojski for the patient review and very helpful suggestion! πŸ‘  All fix-up commits has been squashed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "pnowojski (via GitHub)" <gi...@apache.org>.
pnowojski commented on PR #22432:
URL: https://github.com/apache/flink/pull/22432#issuecomment-1540337849

   One more thing, have you maybe manually confirmed that the bug fix is working after all of the changes in this PR? Just to double check your unit tests are indeed testing the right thing :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "pnowojski (via GitHub)" <gi...@apache.org>.
pnowojski commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1188607839


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputWithChainingCheck.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * This is a wrapper for outputs to check whether the collected record has been emitted to a
+ * downstream subtask or to a chained operator.
+ */
+@Internal
+public interface OutputWithChainingCheck<OUT> extends WatermarkGaugeExposingOutput<OUT> {
+    /**
+     * @return true if the collected record has been emitted to a downstream subtask. Otherwise,
+     *     false.
+     */
+    boolean collectAndCheckIfCountNeeded(OUT record);

Review Comment:
   nitty nit: rename to `collectAndCheckIfChained`?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##########
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
         }
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OperatorID(), new OddEvenOperator())
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)

Review Comment:
   Why we need to pass a tag for one but not for the other? πŸ€” 



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##########
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
         }
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OperatorID(), new OddEvenOperator())
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new DuplicatingOperator()))
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .finish()
+                        .setTaskMetricGroup(taskMetricGroup)
+                        .build()) {
+            Counter numRecordsInCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+            Counter numRecordsOutCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+            final int numEvenRecords = 5;
+            final int numOddRecords = 3;
+
+            for (int x = 0; x < numEvenRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x));
+            }
+
+            for (int x = 0; x < numOddRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x + 1));
+            }
+            assertEquals(numOddRecords + numEvenRecords, numRecordsInCounter.getCount());
+            assertEquals(
+                    numOddRecords
+                            + (numOddRecords + numEvenRecords)
+                            + (numOddRecords + numEvenRecords) * 2,
+                    numRecordsOutCounter.getCount());

Review Comment:
   I don't follow this assertion. 
   
   Shouldn't this be:
   
   - `(numOddRecords + numEvenRecords) * 2` from the `DuplicatingOperator`
   - `numOddRecords + numEvenRecords` from the first `OddEvenOperator`
   - `numOddRecords + numEvenRecords` from the second `OddEvenOperator`
   
   So `numOddRecords + numEvenRecords` * 4 in total? πŸ€”  
   
   nit: Also it would help if you could extract the components of the total expected output records count to a separate local/named variables for example:
   ```
               int firstOddEvenOperatorOutputs = ...;
               int secondOddEvenOperatorOutputs = ...;
               int duplicatingOperatorOutput = ...;
               assertEquals(
                       firstOddEvenOperatorOutputs + secondOddEvenOperatorOutputs + duplicatingOperatorOutput,
                       numRecordsOutCounter.getCount());
   ```
   
   ditto for the multi and two input case. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1188891342


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##########
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
         }
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OperatorID(), new OddEvenOperator())
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new DuplicatingOperator()))
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .finish()
+                        .setTaskMetricGroup(taskMetricGroup)
+                        .build()) {
+            Counter numRecordsInCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+            Counter numRecordsOutCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+            final int numEvenRecords = 5;
+            final int numOddRecords = 3;
+
+            for (int x = 0; x < numEvenRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x));
+            }
+
+            for (int x = 0; x < numOddRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x + 1));
+            }
+            assertEquals(numOddRecords + numEvenRecords, numRecordsInCounter.getCount());
+            assertEquals(
+                    numOddRecords
+                            + (numOddRecords + numEvenRecords)
+                            + (numOddRecords + numEvenRecords) * 2,
+                    numRecordsOutCounter.getCount());

Review Comment:
   The topology graph of this test case is as follows:
   ![image](https://github.com/apache/flink/assets/19502505/cdae0246-da53-4860-b3c8-9f2d62dfe559)
   - The first operator does not have `NonChainedOutput`.
   - The second operator have two `RecordWriter With Odd Tag` as this code:`.addNonChainedOutputsCount(new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)`
   
   The assertion value(`numOddRecords + (numOddRecords + numEvenRecords) + (numOddRecords + numEvenRecords) * 2`) from the following three parts:
   
   - `numOddRecords`  from the second `OddEvenOperator`'s two `RecordWriterOutput With Odd Tag`. It is only calculated once here, which is guaranteed by the logic of `BroadcastingOutputCollector`.
   - `numOddRecords + numEvenRecords` from the second `OddEvenOperator`'s `RecordWriterOutput`.
   - `(numOddRecords + numEvenRecords) * 2` from the `DuplicatingOperator`
   
   It should be noted here that:
   - `OddEvenOperator` will send duplicated data to the side output and normal output. So we have the first two parts above.
   ```
        public void processElement(StreamRecord<Integer> element) {
               if (element.getValue() % 2 == 0) {
                   output.collect(evenOutputTag, element);
               } else {
                   output.collect(oddOutputTag, element);
               }
               output.collect(element);
        }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski merged pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "pnowojski (via GitHub)" <gi...@apache.org>.
pnowojski merged PR #22432:
URL: https://github.com/apache/flink/pull/22432


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1175224675


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java:
##########
@@ -17,52 +17,77 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.XORShiftRandom;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
 
-    protected final Output<StreamRecord<T>>[] outputs;
+    protected final Output<StreamRecord<T>>[] allOutputs;
+
+    protected final Output<StreamRecord<T>>[] chainedOutputs;
+
+    protected final RecordWriterOutput<T>[] nonChainedOutputs;
+
+    protected final Counter numRecordsOutForTask;
     private final Random random = new XORShiftRandom();
     private final WatermarkGauge watermarkGauge = new WatermarkGauge();
 
-    public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
-        this.outputs = outputs;
+    @SuppressWarnings({"unchecked"})
+    public BroadcastingOutputCollector(
+            Output<StreamRecord<T>>[] allOutputs, Counter numRecordsOutForTask) {
+        this.allOutputs = allOutputs;
+        this.numRecordsOutForTask = numRecordsOutForTask;
+
+        List<Output<StreamRecord<T>>> chainedOutputs = new ArrayList<>(4);
+        List<RecordWriterOutput<T>> nonChainedOutputs = new ArrayList<>(4);
+        for (Output<StreamRecord<T>> output : allOutputs) {
+            if (output instanceof RecordWriterOutput) {
+                nonChainedOutputs.add((RecordWriterOutput<T>) output);
+            } else {
+                chainedOutputs.add(output);
+            }
+        }
+        this.chainedOutputs = chainedOutputs.toArray(new Output[0]);
+        this.nonChainedOutputs = nonChainedOutputs.toArray(new RecordWriterOutput[0]);

Review Comment:
   >if replaced chainedOutputs and nonChainedOutputs arrays, with a single array boolean[] isOutputChained 
   
   That's really good advice. However, it seems that type conversions like the following cannot be completely avoided, because `CollectAndCheckIfEmitted` is placed in `RecordWriterOutput`. I'm concerned about the performance impact of this. Another approach is that we move `CollectAndCheckIfEmitted` to the interface `Output`/`Collector`, but this will touch the `public` API, or we need to introduce a special interface to handle this?
   
   ```
    for (int i = 0; i < outputs.length; i++) {
               if (isOutputChained[i]) {
                   outputs[i].collect(record);
               } else {
                   emitted |= ((RecordWriterOutput<T>) outputs[i]).collectAndCheckIfEmitted(record);
               }
           }
   ```



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java:
##########
@@ -17,52 +17,77 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.XORShiftRandom;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
 
-    protected final Output<StreamRecord<T>>[] outputs;
+    protected final Output<StreamRecord<T>>[] allOutputs;
+
+    protected final Output<StreamRecord<T>>[] chainedOutputs;
+
+    protected final RecordWriterOutput<T>[] nonChainedOutputs;
+
+    protected final Counter numRecordsOutForTask;
     private final Random random = new XORShiftRandom();
     private final WatermarkGauge watermarkGauge = new WatermarkGauge();
 
-    public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
-        this.outputs = outputs;
+    @SuppressWarnings({"unchecked"})
+    public BroadcastingOutputCollector(
+            Output<StreamRecord<T>>[] allOutputs, Counter numRecordsOutForTask) {
+        this.allOutputs = allOutputs;
+        this.numRecordsOutForTask = numRecordsOutForTask;
+
+        List<Output<StreamRecord<T>>> chainedOutputs = new ArrayList<>(4);
+        List<RecordWriterOutput<T>> nonChainedOutputs = new ArrayList<>(4);
+        for (Output<StreamRecord<T>> output : allOutputs) {
+            if (output instanceof RecordWriterOutput) {
+                nonChainedOutputs.add((RecordWriterOutput<T>) output);
+            } else {
+                chainedOutputs.add(output);
+            }
+        }
+        this.chainedOutputs = chainedOutputs.toArray(new Output[0]);
+        this.nonChainedOutputs = nonChainedOutputs.toArray(new RecordWriterOutput[0]);

Review Comment:
   >if replaced chainedOutputs and nonChainedOutputs arrays, with a single array boolean[] isOutputChained 
   
   That's really good advice. However, it seems that type conversions like the following cannot be completely avoided, because `CollectAndCheckIfEmitted` is placed in `RecordWriterOutput`. I'm concerned about the performance impact of this. Another approach is that we move `CollectAndCheckIfEmitted` to the interface `Output`/`Collector`, but this will touch the `public` API, or we need to introduce a special interface to handle this? πŸ€” 
   
   ```
    for (int i = 0; i < outputs.length; i++) {
               if (isOutputChained[i]) {
                   outputs[i].collect(record);
               } else {
                   emitted |= ((RecordWriterOutput<T>) outputs[i]).collectAndCheckIfEmitted(record);
               }
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "pnowojski (via GitHub)" <gi...@apache.org>.
pnowojski commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1180210194


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputWithRecordsCountCheck.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+
+/** This is a wrapper for outputs to check whether its record output needs to be counted. */
+@Internal
+public interface OutputWithRecordsCountCheck<OUT> {

Review Comment:
   I think this should extend from `Output`, otherwise it can lead to strange issues like 
   ```public class X implements Output<Foo>, OutputWithRecordsCountCheck<Bar>```



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputWithRecordsCountCheck.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+
+/** This is a wrapper for outputs to check whether its record output needs to be counted. */

Review Comment:
   ```
   This is a wrapper for outputs to check whether the collected record has been emitted to a downstream subtask or to a chained operator
   ```



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java:
##########
@@ -774,6 +801,14 @@ private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
         return result;
     }
 
+    private static Counter createStreamCounter(StreamTask<?, ?> containingTask) {

Review Comment:
   rename to `createNumRecordsOutCounter`?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java:
##########
@@ -743,22 +747,45 @@ private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
 
         if (allOutputs.size() == 1) {
             result = allOutputs.get(0);
+            // only if this is a single RecordWriterOutput, reuse its numRecordOut for task.
+            if (result instanceof RecordWriterOutput) {
+                TaskIOMetricGroup taskIOMetricGroup =
+                        containingTask.getEnvironment().getMetricGroup().getIOMetricGroup();
+                Counter counter = new SimpleCounter();
+                ((RecordWriterOutput<T>) result).setNumRecordsOut(counter);
+                taskIOMetricGroup.reuseRecordsOutputCounter(counter);

Review Comment:
   reuse `createNumRecordsOutCounter`?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java:
##########
@@ -262,4 +289,143 @@ public StreamConfigChainer<OWNER> name(String name) {
     public void setBufferTimeout(int bufferTimeout) {
         this.bufferTimeout = bufferTimeout;
     }
+
+    /** Helper class to build operator node. */
+    public static class StreamConfigEdgeChainer<OWNER, IN, OUT> {

Review Comment:
   nit: for the future, it would be easier to review, if you added this builder class in a separate commit, preceding your changes. Then in your bug fix you could have only added `outEdgesInOrder` and `setTailNonChainedOutputs ` fields to the builder, which would make the bug fix commit smaller and easier to review.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java:
##########
@@ -31,12 +32,19 @@
 
 class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
 
+    protected final OutputWithRecordsCountCheck<StreamRecord<T>>[] outputWithRecordsCountChecks;
     protected final Output<StreamRecord<T>>[] outputs;
     private final Random random = new XORShiftRandom();
     private final WatermarkGauge watermarkGauge = new WatermarkGauge();
+    protected final Counter numRecordsOutForTask;
 
-    public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
+    public BroadcastingOutputCollector(
+            OutputWithRecordsCountCheck<StreamRecord<T>>[] outputWithRecordsCountChecks,
+            Output<StreamRecord<T>>[] outputs,
+            Counter numRecordsOutForTask) {
         this.outputs = outputs;
+        this.outputWithRecordsCountChecks = outputWithRecordsCountChecks;

Review Comment:
   That could be solved by making `OutputWithRecordsCountCheck` extend from `Output`.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputWithRecordsCountCheck.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+
+/** This is a wrapper for outputs to check whether its record output needs to be counted. */
+@Internal
+public interface OutputWithRecordsCountCheck<OUT> {

Review Comment:
   Maybe let's rename this to `OutputWithChainingCheck`?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputWithRecordsCountCheck.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+
+/** This is a wrapper for outputs to check whether its record output needs to be counted. */
+@Internal
+public interface OutputWithRecordsCountCheck<OUT> {
+    /**
+     * Collect a record and check if it needs to be counted.
+     *
+     * @param record The record to collect.
+     */
+    boolean collectAndCheckIfCountNeeded(OUT record);
+
+    /**
+     * Collect a record to the side output identified by the given {@link OutputTag} and check if it
+     * needs to be counted.
+     *
+     * @param record The record to collect.
+     * @param outputTag Identification of side outputs.
+     */
+    <X> boolean collectAndCheckIfCountNeeded(OutputTag<X> outputTag, StreamRecord<X> record);

Review Comment:
   I would drop those java docs, as they are repeating the class level java doc. `@params` are not adding much information as well. 
   
   However I would explain what does the return value mean.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java:
##########
@@ -77,6 +86,7 @@ private void head(OperatorID headOperatorID) {
         headConfig.setChainIndex(chainIndex);
     }
 
+    @Deprecated

Review Comment:
   nit: could you add a java-doc pointing to what should be used instead of those deprecate methods?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1188891342


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##########
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
         }
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OperatorID(), new OddEvenOperator())
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new DuplicatingOperator()))
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .finish()
+                        .setTaskMetricGroup(taskMetricGroup)
+                        .build()) {
+            Counter numRecordsInCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+            Counter numRecordsOutCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+            final int numEvenRecords = 5;
+            final int numOddRecords = 3;
+
+            for (int x = 0; x < numEvenRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x));
+            }
+
+            for (int x = 0; x < numOddRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x + 1));
+            }
+            assertEquals(numOddRecords + numEvenRecords, numRecordsInCounter.getCount());
+            assertEquals(
+                    numOddRecords
+                            + (numOddRecords + numEvenRecords)
+                            + (numOddRecords + numEvenRecords) * 2,
+                    numRecordsOutCounter.getCount());

Review Comment:
   The topology graph of this test case is as follows:
   ![image](https://github.com/apache/flink/assets/19502505/cdae0246-da53-4860-b3c8-9f2d62dfe559)
   - The first operator does not have `NonChainedOutput`.
   - The second operator have two `RecordWriter With Odd Tag` as this code:`.addNonChainedOutputsCount(new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)`
   
   The assertion value(`numOddRecords + (numOddRecords + numEvenRecords) + (numOddRecords + numEvenRecords) * 2`) from the following three parts:
   
   - `numOddRecords`  from the second `OddEvenOperator`'s two `RecordWriterOutput With Odd Tag`. It is only calculated once here, which is guaranteed by the logic of `BroadcastingOutputCollector`.
   - `numOddRecords + numEvenRecords` from the second `OddEvenOperator`'s `RecordWriterOutput`.
   - `(numOddRecords + numEvenRecords) * 2` from the `DuplicatingOperator`
   
   It should be noted here that:
   - `OddEvenOperator` will send duplicated data to the side output and normal output. So we have the first two parts above. 
   ```
        public void processElement(StreamRecord<Integer> element) {
               if (element.getValue() % 2 == 0) {
                   output.collect(evenOutputTag, element);
               } else {
                   output.collect(oddOutputTag, element);
               }
               output.collect(element);
        }
   ```
   - `ChainedOutput` will ignore the data with output tag as we don't set `outputTag` for chaining `StreamEdge`. So subsequent operators will not receive duplicate data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22432:
URL: https://github.com/apache/flink/pull/22432#issuecomment-1540595646

   Thanks @pnowojski for the review!
   
   I have resolved some of the comments, and replied to the rest in comments. I will update promptly after receiving feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22432:
URL: https://github.com/apache/flink/pull/22432#issuecomment-1540595241

   > One more thing, have you maybe manually confirmed that the bug fix is working after all of the changes in this PR.
   
   Yes, I manually test the topology described in [FLINK-31852](https://issues.apache.org/jira/browse/FLINK-31852) and [FLINK-18808](https://issues.apache.org/jira/browse/FLINK-18808) and It seems to be in line with expectations.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "pnowojski (via GitHub)" <gi...@apache.org>.
pnowojski commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1189479094


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##########
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
         }
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OperatorID(), new OddEvenOperator())
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)

Review Comment:
   ok, I get it know. It's fine as it is :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22432:
URL: https://github.com/apache/flink/pull/22432#issuecomment-1542002743

   Thanks for the quick reply. I have squashed all previous fix-up commits and pushed a new commit to address latest comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1188891342


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##########
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
         }
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OperatorID(), new OddEvenOperator())
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new DuplicatingOperator()))
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .finish()
+                        .setTaskMetricGroup(taskMetricGroup)
+                        .build()) {
+            Counter numRecordsInCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+            Counter numRecordsOutCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+            final int numEvenRecords = 5;
+            final int numOddRecords = 3;
+
+            for (int x = 0; x < numEvenRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x));
+            }
+
+            for (int x = 0; x < numOddRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x + 1));
+            }
+            assertEquals(numOddRecords + numEvenRecords, numRecordsInCounter.getCount());
+            assertEquals(
+                    numOddRecords
+                            + (numOddRecords + numEvenRecords)
+                            + (numOddRecords + numEvenRecords) * 2,
+                    numRecordsOutCounter.getCount());

Review Comment:
   The topology graph of this test case is as follows:
   ![image](https://github.com/apache/flink/assets/19502505/cdae0246-da53-4860-b3c8-9f2d62dfe559)
   - The first operator does not have `NonChainedOutput`.
   - The second operator have two `RecordWriter With Odd Tag` as this code:`.addNonChainedOutputsCount(new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)`
   
   The assertion value(`numOddRecords + (numOddRecords + numEvenRecords) + (numOddRecords + numEvenRecords) * 2`) from the following three parts:
   
   - `numOddRecords`  from the second `OddEvenOperator`'s two `RecordWriterOutput With Odd Tag`. It is only calculated once here, which is guaranteed by the logic of `BroadcastingOutputCollector`.
   - `numOddRecords + numEvenRecords` from the second `OddEvenOperator`'s `RecordWriterOutput`.
   - `(numOddRecords + numEvenRecords) * 2` from the `DuplicatingOperator`
   
   It should be noted here that:
   - `OddEvenOperator` will send duplicated data to the side output and normal output. So we have the first two parts above. 
   - `ChainedOutput` will ignore the data with output tag, subsequent operators will not receive duplicate data.
   ```
        public void processElement(StreamRecord<Integer> element) {
               if (element.getValue() % 2 == 0) {
                   output.collect(evenOutputTag, element);
               } else {
                   output.collect(oddOutputTag, element);
               }
               output.collect(element);
        }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1178599478


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java:
##########
@@ -17,52 +17,77 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.XORShiftRandom;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
 
-    protected final Output<StreamRecord<T>>[] outputs;
+    protected final Output<StreamRecord<T>>[] allOutputs;
+
+    protected final Output<StreamRecord<T>>[] chainedOutputs;
+
+    protected final RecordWriterOutput<T>[] nonChainedOutputs;
+
+    protected final Counter numRecordsOutForTask;
     private final Random random = new XORShiftRandom();
     private final WatermarkGauge watermarkGauge = new WatermarkGauge();
 
-    public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
-        this.outputs = outputs;
+    @SuppressWarnings({"unchecked"})
+    public BroadcastingOutputCollector(
+            Output<StreamRecord<T>>[] allOutputs, Counter numRecordsOutForTask) {
+        this.allOutputs = allOutputs;
+        this.numRecordsOutForTask = numRecordsOutForTask;
+
+        List<Output<StreamRecord<T>>> chainedOutputs = new ArrayList<>(4);
+        List<RecordWriterOutput<T>> nonChainedOutputs = new ArrayList<>(4);
+        for (Output<StreamRecord<T>> output : allOutputs) {
+            if (output instanceof RecordWriterOutput) {
+                nonChainedOutputs.add((RecordWriterOutput<T>) output);
+            } else {
+                chainedOutputs.add(output);
+            }
+        }
+        this.chainedOutputs = chainedOutputs.toArray(new Output[0]);
+        this.nonChainedOutputs = nonChainedOutputs.toArray(new RecordWriterOutput[0]);

Review Comment:
   Good point πŸ‘, I have updated this pull request according to this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1178600568


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java:
##########
@@ -31,12 +32,19 @@
 
 class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
 
+    protected final OutputWithRecordsCountCheck<StreamRecord<T>>[] outputWithRecordsCountChecks;
     protected final Output<StreamRecord<T>>[] outputs;
     private final Random random = new XORShiftRandom();
     private final WatermarkGauge watermarkGauge = new WatermarkGauge();
+    protected final Counter numRecordsOutForTask;
 
-    public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
+    public BroadcastingOutputCollector(
+            OutputWithRecordsCountCheck<StreamRecord<T>>[] outputWithRecordsCountChecks,
+            Output<StreamRecord<T>>[] outputs,
+            Counter numRecordsOutForTask) {
         this.outputs = outputs;
+        this.outputWithRecordsCountChecks = outputWithRecordsCountChecks;

Review Comment:
   The reason why `output` is not removed is that some methods are still on the `Output` interface, such as `emitLatencyMarker`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1175228470


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingBroadcastingOutputCollector.java:
##########
@@ -27,37 +29,57 @@
  */
 final class CopyingBroadcastingOutputCollector<T> extends BroadcastingOutputCollector<T> {
 
-    public CopyingBroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
-        super(outputs);
+    public CopyingBroadcastingOutputCollector(
+            Output<StreamRecord<T>>[] allOutputs, Counter numRecordsOutForTask) {
+        super(allOutputs, numRecordsOutForTask);
     }
 
     @Override
     public void collect(StreamRecord<T> record) {
+        boolean emitted = false;
+        for (int i = 0; i < nonChainedOutputs.length - 1; i++) {
+            RecordWriterOutput<T> output = nonChainedOutputs[i];
+            StreamRecord<T> shallowCopy = record.copy(record.getValue());
+            emitted |= output.collectAndCheckIfEmitted(shallowCopy);
+        }
 
-        for (int i = 0; i < outputs.length - 1; i++) {
-            Output<StreamRecord<T>> output = outputs[i];
+        if (chainedOutputs.length == 0 && nonChainedOutputs.length > 0) {
+            emitted |=
+                    nonChainedOutputs[nonChainedOutputs.length - 1].collectAndCheckIfEmitted(
+                            record);
+        } else if (nonChainedOutputs.length > 0) {
             StreamRecord<T> shallowCopy = record.copy(record.getValue());
-            output.collect(shallowCopy);
+            emitted |=
+                    nonChainedOutputs[nonChainedOutputs.length - 1].collectAndCheckIfEmitted(
+                            shallowCopy);
         }
 
-        if (outputs.length > 0) {
-            // don't copy for the last output
-            outputs[outputs.length - 1].collect(record);
+        if (chainedOutputs.length > 0) {
+            for (int i = 0; i < chainedOutputs.length - 1; i++) {
+                Output<StreamRecord<T>> output = chainedOutputs[i];
+                StreamRecord<T> shallowCopy = record.copy(record.getValue());
+                output.collect(shallowCopy);
+            }
+            chainedOutputs[chainedOutputs.length - 1].collect(record);
+        }
+
+        if (emitted) {
+            numRecordsOutForTask.inc();
         }
     }
 
     @Override
     public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
-        for (int i = 0; i < outputs.length - 1; i++) {
-            Output<StreamRecord<T>> output = outputs[i];
+        for (int i = 0; i < allOutputs.length - 1; i++) {
+            Output<StreamRecord<T>> output = allOutputs[i];
 
             StreamRecord<X> shallowCopy = record.copy(record.getValue());
             output.collect(outputTag, shallowCopy);
         }
 
-        if (outputs.length > 0) {
+        if (allOutputs.length > 0) {
             // don't copy for the last output
-            outputs[outputs.length - 1].collect(outputTag, record);
+            allOutputs[allOutputs.length - 1].collect(outputTag, record);
         }
     }
 }

Review Comment:
   Good catch! πŸ‘  
   
   I somehow lost the changes here when migrating from the original code. You are right, I found that the previous test did not cover `ObjectReuse`, I will modify tests to cover this case. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1188900545


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##########
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
         }
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OperatorID(), new OddEvenOperator())
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)

Review Comment:
   When writing this test case, I felt a bit like just considering one of the two tags seemed sufficient. In this test case, no matter how the data is sent, only one `OutputTag` (`Odd` or `Even`) will actually be hit, which should be a dual relationship and will not affect the coverage of this test.
   
   What I actually want to consider here is the scenario where both `RecordWriter Without Tag` and `RecordWriter With Tag` exist, which is also the actual topology of this case. For more details, please refer to the picture attached in the other comments.
   
   Of course, if you think it is necessary to cover all tags here, I think it also makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "pnowojski (via GitHub)" <gi...@apache.org>.
pnowojski commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1173900615


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java:
##########
@@ -17,52 +17,77 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.XORShiftRandom;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
 
-    protected final Output<StreamRecord<T>>[] outputs;
+    protected final Output<StreamRecord<T>>[] allOutputs;
+
+    protected final Output<StreamRecord<T>>[] chainedOutputs;
+
+    protected final RecordWriterOutput<T>[] nonChainedOutputs;
+
+    protected final Counter numRecordsOutForTask;
     private final Random random = new XORShiftRandom();
     private final WatermarkGauge watermarkGauge = new WatermarkGauge();
 
-    public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
-        this.outputs = outputs;
+    @SuppressWarnings({"unchecked"})
+    public BroadcastingOutputCollector(
+            Output<StreamRecord<T>>[] allOutputs, Counter numRecordsOutForTask) {
+        this.allOutputs = allOutputs;
+        this.numRecordsOutForTask = numRecordsOutForTask;
+
+        List<Output<StreamRecord<T>>> chainedOutputs = new ArrayList<>(4);
+        List<RecordWriterOutput<T>> nonChainedOutputs = new ArrayList<>(4);
+        for (Output<StreamRecord<T>> output : allOutputs) {
+            if (output instanceof RecordWriterOutput) {
+                nonChainedOutputs.add((RecordWriterOutput<T>) output);
+            } else {
+                chainedOutputs.add(output);
+            }
+        }
+        this.chainedOutputs = chainedOutputs.toArray(new Output[0]);
+        this.nonChainedOutputs = nonChainedOutputs.toArray(new RecordWriterOutput[0]);

Review Comment:
   Could we move this logic of splitting into separate lists up, for example to the `OperatorChain`? Here `instanceof` check might be fragile in the future, while in the `OperatorChain#createOutputCollector`  it looks like we have this information at hand when we iterate over `operatorConfig`.
   
   Actually, wouldn't it simplify the logic, especially in `CopyingBroadcastingOutputCollector`, if replaced `chainedOutputs` and `nonChainedOutputs` arrays, with a single array `boolean[] isOutputChained` (also constructed at the `OperatorChain` level?) Then Instead of iterating over two arrays one after another, you could easily handle everything in the one loop:
   
   ```
   for (int i = 0; i < outputs.length; i++) {
       ...
       if (isOutputChained[i]) {
           ...
       }
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22432:
URL: https://github.com/apache/flink/pull/22432#issuecomment-1535841385

   Thanks @pnowojski for the review!
   
   Sorry for the late reply as the previous day was a holiday in ChinaπŸ˜„, I have updated this pr according to your comments. Please take a look again in your free time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22432:
URL: https://github.com/apache/flink/pull/22432#issuecomment-1517630686

   Hi @pnowojski, Would you mind taking a look at this in you free time as you have been reviewed the original [PR](https://github.com/apache/flink/pull/13109).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22432:
URL: https://github.com/apache/flink/pull/22432#issuecomment-1516030324

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d7e4a5ecf6375f658a03b325b35e517ba021feaa",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d7e4a5ecf6375f658a03b325b35e517ba021feaa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d7e4a5ecf6375f658a03b325b35e517ba021feaa UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1178599478


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java:
##########
@@ -17,52 +17,77 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.XORShiftRandom;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
 
-    protected final Output<StreamRecord<T>>[] outputs;
+    protected final Output<StreamRecord<T>>[] allOutputs;
+
+    protected final Output<StreamRecord<T>>[] chainedOutputs;
+
+    protected final RecordWriterOutput<T>[] nonChainedOutputs;
+
+    protected final Counter numRecordsOutForTask;
     private final Random random = new XORShiftRandom();
     private final WatermarkGauge watermarkGauge = new WatermarkGauge();
 
-    public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
-        this.outputs = outputs;
+    @SuppressWarnings({"unchecked"})
+    public BroadcastingOutputCollector(
+            Output<StreamRecord<T>>[] allOutputs, Counter numRecordsOutForTask) {
+        this.allOutputs = allOutputs;
+        this.numRecordsOutForTask = numRecordsOutForTask;
+
+        List<Output<StreamRecord<T>>> chainedOutputs = new ArrayList<>(4);
+        List<RecordWriterOutput<T>> nonChainedOutputs = new ArrayList<>(4);
+        for (Output<StreamRecord<T>> output : allOutputs) {
+            if (output instanceof RecordWriterOutput) {
+                nonChainedOutputs.add((RecordWriterOutput<T>) output);
+            } else {
+                chainedOutputs.add(output);
+            }
+        }
+        this.chainedOutputs = chainedOutputs.toArray(new Output[0]);
+        this.nonChainedOutputs = nonChainedOutputs.toArray(new RecordWriterOutput[0]);

Review Comment:
   Good point πŸ‘, I have updated this pull request according to this. You can take a look in your free time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "pnowojski (via GitHub)" <gi...@apache.org>.
pnowojski commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1177575706


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java:
##########
@@ -17,52 +17,77 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.XORShiftRandom;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
 
-    protected final Output<StreamRecord<T>>[] outputs;
+    protected final Output<StreamRecord<T>>[] allOutputs;
+
+    protected final Output<StreamRecord<T>>[] chainedOutputs;
+
+    protected final RecordWriterOutput<T>[] nonChainedOutputs;
+
+    protected final Counter numRecordsOutForTask;
     private final Random random = new XORShiftRandom();
     private final WatermarkGauge watermarkGauge = new WatermarkGauge();
 
-    public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
-        this.outputs = outputs;
+    @SuppressWarnings({"unchecked"})
+    public BroadcastingOutputCollector(
+            Output<StreamRecord<T>>[] allOutputs, Counter numRecordsOutForTask) {
+        this.allOutputs = allOutputs;
+        this.numRecordsOutForTask = numRecordsOutForTask;
+
+        List<Output<StreamRecord<T>>> chainedOutputs = new ArrayList<>(4);
+        List<RecordWriterOutput<T>> nonChainedOutputs = new ArrayList<>(4);
+        for (Output<StreamRecord<T>> output : allOutputs) {
+            if (output instanceof RecordWriterOutput) {
+                nonChainedOutputs.add((RecordWriterOutput<T>) output);
+            } else {
+                chainedOutputs.add(output);
+            }
+        }
+        this.chainedOutputs = chainedOutputs.toArray(new Output[0]);
+        this.nonChainedOutputs = nonChainedOutputs.toArray(new RecordWriterOutput[0]);

Review Comment:
   I see, maybe instead of using `if/else` check, we could use some object oriented wrapper? Something like or `OutputWithRecordsEmittedCheck`, that would be an `@Internal` interface. Implementation could be:
   
   ```
   class ChainingOutput implements OutputWithRecordsEmittedCheck {
     boolean collectAndCheckIfEmitted(...) {
       this.underlyingOutput.collect(...);
       return false;
     }
   }
   ```
   Using this interface, we could implement it in the existing internal implementations/wrappers (to avoid adding another layer), like `ChainingOutput`.
   
   We could also avoid `RecordWriterCountingOutput` wrapper, by adding the records emitted counter directly to the `RecordWriterOutput`.
   
   Maybe those changes wouldn't be too complicated? WDYT? If so, it might make sense trying to split this PR into at least two smaller commits? For example the first commit could be introducing `OutputWithRecordsEmittedCheck` and it's implementations, and the actual bug fix using `collectAndCheckIfEmitted` in the following commit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1188921664


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##########
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
         }
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OperatorID(), new OddEvenOperator())
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new DuplicatingOperator()))
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .finish()
+                        .setTaskMetricGroup(taskMetricGroup)
+                        .build()) {
+            Counter numRecordsInCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+            Counter numRecordsOutCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+            final int numEvenRecords = 5;
+            final int numOddRecords = 3;
+
+            for (int x = 0; x < numEvenRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x));
+            }
+
+            for (int x = 0; x < numOddRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x + 1));
+            }
+            assertEquals(numOddRecords + numEvenRecords, numRecordsInCounter.getCount());
+            assertEquals(
+                    numOddRecords
+                            + (numOddRecords + numEvenRecords)
+                            + (numOddRecords + numEvenRecords) * 2,
+                    numRecordsOutCounter.getCount());

Review Comment:
   > ditto for the multi and two input case.](nit: Also it would help if you could extract the components of the total expected output records count to a separate local/named variables)
   
   This really a good suggestion.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1189760303


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##########
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
         }
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OperatorID(), new OddEvenOperator())
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new DuplicatingOperator()))
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .finish()
+                        .setTaskMetricGroup(taskMetricGroup)
+                        .build()) {
+            Counter numRecordsInCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+            Counter numRecordsOutCounter =
+                    taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+            final int numEvenRecords = 5;
+            final int numOddRecords = 3;
+
+            for (int x = 0; x < numEvenRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x));
+            }
+
+            for (int x = 0; x < numOddRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x + 1));
+            }
+            assertEquals(numOddRecords + numEvenRecords, numRecordsInCounter.getCount());
+            assertEquals(
+                    numOddRecords
+                            + (numOddRecords + numEvenRecords)
+                            + (numOddRecords + numEvenRecords) * 2,
+                    numRecordsOutCounter.getCount());

Review Comment:
   > If the above is correct, is this construct officially supported behaviour of Flink? Can user create such pipelines? If not, I would either remove the first operator, or replace it with just a no-op/pass-through operator that just forwards input records. 
   
   Fair enough! This is indeed not a conventional pipeline, I have replaced the first operator by a `PassThroughOperator`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1185745407


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputWithRecordsCountCheck.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+
+/** This is a wrapper for outputs to check whether its record output needs to be counted. */
+@Internal
+public interface OutputWithRecordsCountCheck<OUT> {

Review Comment:
   Good point! But I made it inherit `WatermarkGaugeExposingOutput` instead of `Output` as the return value of `org.apache.flink.streaming.runtime.tasks.OperatorChain#createOutputCollector` is `WatermarkGaugeExposingOutput`, which is easier to handle. WDTY?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org