You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/10 13:48:01 UTC

[GitHub] [flink] liming30 opened a new pull request #13109: [FLINK-18808][runtime/metircs]]Task level numRecordsOut metric increases the output of non-end operators

liming30 opened a new pull request #13109:
URL: https://github.com/apache/flink/pull/13109


   ## What is the purpose of the change
   
   *If an operator has nonChainedOutputs, then its corresponding output should be counted in the task-level numRecordsOut metric.*
   
   ## Brief change log
   
   *Task level numRecordsOut metric increases the output of non-end operators.*
   
   
   ## Verifying this change
   
   This change modified tests and can be verified as follows:
   
     - *OneInputStreamTaskTest#testOperatorMetricReuse*
     - *TwoInputStreamTaskTest#testOperatorMetricReuse*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-726795330


   @liming30 do you still intend to work on this issue?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5928",
       "triggerID" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932",
       "triggerID" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "triggerType" : "PUSH"
     }, {
       "hash" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5951",
       "triggerID" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5960",
       "triggerID" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3a7e71c54243a063412da38da4be35c4f3b2d175",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6109",
       "triggerID" : "3a7e71c54243a063412da38da4be35c4f3b2d175",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b43e9eda0e94696b0563922ecb37b202bca47af5",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6222",
       "triggerID" : "b43e9eda0e94696b0563922ecb37b202bca47af5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b43e9eda0e94696b0563922ecb37b202bca47af5 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6222) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13109:
URL: https://github.com/apache/flink/pull/13109#discussion_r484508922



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputsCollector.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Map;
+
+/**
+ * The selected outputs collector will send records to the default output,
+ * and output matching outputNames.
+ *
+ * @param <T> The type of the elements that can be emitted.
+ */
+public class DirectedOutputsCollector<T> implements SelectedOutputsCollector<T> {
+
+	private final Output<StreamRecord<T>>[] selectAllOutputs;
+	private final Map<String, Output<StreamRecord<T>>[]> outputMap;
+
+	public DirectedOutputsCollector(
+		Output<StreamRecord<T>>[] selectAllOutputs,
+		Map<String, Output<StreamRecord<T>>[]> outputMap) {
+		this.selectAllOutputs = selectAllOutputs;
+		this.outputMap = outputMap;
+	}
+
+	@Override
+	public boolean collect(Iterable<String> outputNames, StreamRecord<T> record) {
+		boolean emitted = false;
+
+		if (selectAllOutputs.length > 0) {
+			collect(selectAllOutputs, record);
+			emitted = true;
+		}
+
+		for (String outputName : outputNames) {
+			Output<StreamRecord<T>>[] outputList = outputMap.get(outputName);
+			if (outputList != null && outputList.length > 0) {
+				collect(outputList, record);
+				emitted = true;
+			}
+		}

Review comment:
       Thanks for pointing this out. I'm trying to get an answer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metircs]]Task level numRecordsOut metric increases the output of non-end operators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8c9ec1afe6f942c5642569b125cd1ed71d2c5839 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8c9ec1afe6f942c5642569b125cd1ed71d2c5839 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366) 
   * 775d504a5713a48cfc59c98b4ec26cde75a60002 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8c9ec1afe6f942c5642569b125cd1ed71d2c5839 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366) 
   * 775d504a5713a48cfc59c98b4ec26cde75a60002 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5928",
       "triggerID" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 76f5d0cbc997d1b0f6c47345900e09abed2f52bb Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5928) 
   * 3d854c62355f9049062a7ae6a908dcceecd9c213 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liming30 commented on a change in pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
liming30 commented on a change in pull request #13109:
URL: https://github.com/apache/flink/pull/13109#discussion_r478424895



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/SelectedOutputsCollectorImpl.java
##########
@@ -0,0 +1,62 @@
+package org.apache.flink.streaming.api.collector.selector;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Map;
+
+/**
+ * The selected outputs collector will send records to the default output,
+ * and output matching outputNames.
+ *
+ * @param <T> The type of the elements that can be emitted.
+ */
+public class SelectedOutputsCollectorImpl<T> implements SelectedOutputsCollector<T> {
+
+	private final Output<StreamRecord<T>>[] selectAllOutputs;
+	private final Map<String, Output<StreamRecord<T>>[]> outputMap;
+
+	private final boolean objectReuse;
+
+	public SelectedOutputsCollectorImpl(
+		Output<StreamRecord<T>>[] selectAllOutputs,
+		Map<String, Output<StreamRecord<T>>[]> outputMap,
+		boolean objectReuse) {
+		this.selectAllOutputs = selectAllOutputs;
+		this.outputMap = outputMap;
+		this.objectReuse = objectReuse;
+	}
+
+	@Override
+	public boolean collect(Iterable<String> outputNames, StreamRecord<T> record) {
+		boolean emitted = false;
+
+		if (selectAllOutputs.length > 0) {
+			collect(selectAllOutputs, record);
+			emitted = true;
+		}
+

Review comment:
       In the old implementation via `set`, even if the same `output` appears multiple times in `outputNames`, it will only be sent once. Now it will send multiple times and I am not sure if this behavior is correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liming30 commented on a change in pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
liming30 commented on a change in pull request #13109:
URL: https://github.com/apache/flink/pull/13109#discussion_r477237731



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -409,7 +410,7 @@ public OP getHeadOperator() {
 
 		if (selectors == null || selectors.isEmpty()) {
 			// simple path, no selector necessary
-			if (allOutputs.size() == 1) {
+			if (allOutputs.size() == 1 && !(allOutputs.get(0).f0 instanceof RecordWriterOutput)) {

Review comment:
       If we use `CountingOutput` directly, we can’t distinguish whether the record is actually sent (just like it was filtered by `OutputTag` in `sideOutput`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13109:
URL: https://github.com/apache/flink/pull/13109#discussion_r475525041



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
##########
@@ -36,31 +38,56 @@
 	@SuppressWarnings({"unchecked", "rawtypes"})
 	public CopyingDirectedOutput(
 			List<OutputSelector<OUT>> outputSelectors,
-			List<? extends Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
-		super(outputSelectors, outputs);
+			List<? extends Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge>> outputs,
+			Counter numRecordsOutForTask) {
+		super(outputSelectors, outputs, numRecordsOutForTask);
 	}
 
 	@Override
 	public void collect(StreamRecord<OUT> record) {
-		Set<Output<StreamRecord<OUT>>> selectedOutputs = selectOutputs(record);
+		Tuple2<Set<RecordWriterOutput<OUT>>, Set<Output<StreamRecord<OUT>>>> selectedOutputs = selectOutputs(record);

Review comment:
       There are two things that I'm not sure about here.
   
   1. `Tuple2` I think is not the best option. For one thing 
   ```
   !selectedOutputs.f0.isEmpty()
   ```
   or 
   ```protected Tuple2<Set<RecordWriterOutput<OUT>>, Set<Output<StreamRecord<OUT>>>> selectOutputs(StreamRecord<OUT> record)
   ``` 
   are not very readable. Also this is one more object allocation (`Tuple2`). I think it would be better to have two methods:
   ```
   Set<...> selectChainedOutputs(record) {...}
   Set<...> selectNonChainedOutputs(record) {...}
   ```
   
   2. Returning a new `Set<...>` from every call I think is also very expensive and unnecessary. This is a pre-existing problem, but maybe we could fix it? Instead of the `selectOutputs` we could have:
   
   ```
   void collectToChainedOutputs(StreamRecord<...> record, SelectedOutputsCollector<...> collector) {...}
   void selectNonChainedOutputs(StreamRecord<...> record, SelectedOutputsCollector<...> collector) {...}
   ```
   Where
   ```
   public interface SelectedOutputsCollector {
       void collect(Output<StreamRecord<...>> output, StreamRecord<...> record);
   }
   ```
   `SelectedOutputsCollector` would have a 4 versions (or maybe two versions + option to configure the other two? or maybe just a single implementation with 2 boolean flags?) for:
   
   - emitting without bumping `numRecordsOutForTask` without a copy
   - emitting without bumping `numRecordsOutForTask` with a copy
   - emitting with bumping `numRecordsOutForTask` without a copy
   - emitting with bumping `numRecordsOutForTask` with a copy
   
   I think it can make the code cleaner, while also saving on allocations. The one downside would be that we would loose the optimisation for avoiding shallow copying  for the last output, but that's just one extra allocation of `StreamRecord`. However that should be out-weighted by savings on `Set` allocations.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
##########
@@ -36,31 +38,56 @@
 	@SuppressWarnings({"unchecked", "rawtypes"})
 	public CopyingDirectedOutput(
 			List<OutputSelector<OUT>> outputSelectors,
-			List<? extends Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
-		super(outputSelectors, outputs);
+			List<? extends Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge>> outputs,
+			Counter numRecordsOutForTask) {
+		super(outputSelectors, outputs, numRecordsOutForTask);
 	}
 
 	@Override
 	public void collect(StreamRecord<OUT> record) {
-		Set<Output<StreamRecord<OUT>>> selectedOutputs = selectOutputs(record);
+		Tuple2<Set<RecordWriterOutput<OUT>>, Set<Output<StreamRecord<OUT>>>> selectedOutputs = selectOutputs(record);

Review comment:
       There are two things that I'm not sure about here.
   
   1. `Tuple2` I think is not the best option. For one thing 
   ```
   !selectedOutputs.f0.isEmpty()
   ```
   or 
   ```
   protected Tuple2<Set<RecordWriterOutput<OUT>>, Set<Output<StreamRecord<OUT>>>> selectOutputs(StreamRecord<OUT> record)
   ``` 
   are not very readable. Also this is one more object allocation (`Tuple2`). I think it would be better to have two methods:
   ```
   Set<...> selectChainedOutputs(record) {...}
   Set<...> selectNonChainedOutputs(record) {...}
   ```
   
   2. Returning a new `Set<...>` from every call I think is also very expensive and unnecessary. This is a pre-existing problem, but maybe we could fix it? Instead of the `selectOutputs` we could have:
   
   ```
   void collectToChainedOutputs(StreamRecord<...> record, SelectedOutputsCollector<...> collector) {...}
   void selectNonChainedOutputs(StreamRecord<...> record, SelectedOutputsCollector<...> collector) {...}
   ```
   Where
   ```
   public interface SelectedOutputsCollector {
       void collect(Output<StreamRecord<...>> output, StreamRecord<...> record);
   }
   ```
   `SelectedOutputsCollector` would have a 4 versions (or maybe two versions + option to configure the other two? or maybe just a single implementation with 2 boolean flags?) for:
   
   - emitting without bumping `numRecordsOutForTask` without a copy
   - emitting without bumping `numRecordsOutForTask` with a copy
   - emitting with bumping `numRecordsOutForTask` without a copy
   - emitting with bumping `numRecordsOutForTask` with a copy
   
   I think it can make the code cleaner, while also saving on allocations. The one downside would be that we would loose the optimisation for avoiding shallow copying  for the last output, but that's just one extra allocation of `StreamRecord`. However that should be out-weighted by savings on `Set` allocations.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liming30 commented on a change in pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
liming30 commented on a change in pull request #13109:
URL: https://github.com/apache/flink/pull/13109#discussion_r477242251



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -739,40 +749,59 @@ public void collect(StreamRecord<T> record) {
 
 	static class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {

Review comment:
       Okay, I will rebase on it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5928",
       "triggerID" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932",
       "triggerID" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "triggerType" : "PUSH"
     }, {
       "hash" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5951",
       "triggerID" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5960",
       "triggerID" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3a7e71c54243a063412da38da4be35c4f3b2d175",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6109",
       "triggerID" : "3a7e71c54243a063412da38da4be35c4f3b2d175",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b43e9eda0e94696b0563922ecb37b202bca47af5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b43e9eda0e94696b0563922ecb37b202bca47af5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3a7e71c54243a063412da38da4be35c4f3b2d175 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6109) 
   * b43e9eda0e94696b0563922ecb37b202bca47af5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5928",
       "triggerID" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932",
       "triggerID" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "triggerType" : "PUSH"
     }, {
       "hash" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3d854c62355f9049062a7ae6a908dcceecd9c213 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932) 
   * 87ec25bce258c9fc953084701dd3acf7d96ac9e2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liming30 commented on a change in pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
liming30 commented on a change in pull request #13109:
URL: https://github.com/apache/flink/pull/13109#discussion_r478431177



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/SelectedOutputsCollectorImpl.java
##########
@@ -0,0 +1,61 @@
+package org.apache.flink.streaming.api.collector.selector;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Map;
+
+/**
+ * The selected outputs collector will send records to the default output,
+ * and output matching outputNames.
+ *
+ * @param <T> The type of the elements that can be emitted.
+ */
+public class SelectedOutputsCollectorImpl<T> implements SelectedOutputsCollector<T> {
+
+	private final Output<StreamRecord<T>>[] selectAllOutputs;
+	private final Map<String, Output<StreamRecord<T>>[]> outputMap;
+
+	private final boolean objectReuse;
+
+	public SelectedOutputsCollectorImpl(
+		Output<StreamRecord<T>>[] selectAllOutputs,
+		Map<String, Output<StreamRecord<T>>[]> outputMap,
+		boolean objectReuse) {
+		this.selectAllOutputs = selectAllOutputs;
+		this.outputMap = outputMap;
+		this.objectReuse = objectReuse;
+	}
+
+	@Override
+	public boolean collect(Iterable<String> outputNames, StreamRecord<T> record) {
+		boolean emitted = false;
+
+		if (selectAllOutputs.length > 0) {
+			collect(selectAllOutputs, record);
+			emitted = true;
+		}
+
+		for (String outputName : outputNames) {
+			Output<StreamRecord<T>>[] outputList = outputMap.get(outputName);
+			if (outputList != null && outputList.length > 0) {
+				collect(outputList, record);
+				emitted = true;
+			}
+		}

Review comment:
       In the old implementation via `set`, even if the same `output` appears multiple times in `outputNames`, it will only be sent once. Now it will send multiple times and I am not sure if this behavior is correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liming30 commented on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
liming30 commented on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-689927793


   Okay, I understand. I will continue to pay attention to the progress of this issue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 775d504a5713a48cfc59c98b4ec26cde75a60002 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5928",
       "triggerID" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932",
       "triggerID" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "triggerType" : "PUSH"
     }, {
       "hash" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5951",
       "triggerID" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5960",
       "triggerID" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3d854c62355f9049062a7ae6a908dcceecd9c213 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932) 
   * 87ec25bce258c9fc953084701dd3acf7d96ac9e2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5951) 
   * 21aeb58971fb6408da21af37a4a1dfee35917484 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5960) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liming30 commented on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
liming30 commented on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-675346712


   Hi, @pnowojski  After discussing with you last time, the new implementation is almost completely different from the previous submission, so I used the **"git push --force"** command to submit. The test codes related to the side output are implemented separately.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5928",
       "triggerID" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932",
       "triggerID" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "triggerType" : "PUSH"
     }, {
       "hash" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5951",
       "triggerID" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5960",
       "triggerID" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3a7e71c54243a063412da38da4be35c4f3b2d175",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3a7e71c54243a063412da38da4be35c4f3b2d175",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 21aeb58971fb6408da21af37a4a1dfee35917484 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5960) 
   * 3a7e71c54243a063412da38da4be35c4f3b2d175 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5928",
       "triggerID" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932",
       "triggerID" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "triggerType" : "PUSH"
     }, {
       "hash" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5951",
       "triggerID" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5960",
       "triggerID" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3a7e71c54243a063412da38da4be35c4f3b2d175",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6109",
       "triggerID" : "3a7e71c54243a063412da38da4be35c4f3b2d175",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 21aeb58971fb6408da21af37a4a1dfee35917484 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5960) 
   * 3a7e71c54243a063412da38da4be35c4f3b2d175 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6109) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-684919457


   Thanks @liming30 for the update. It's a bit difficult for me to review this now, because of the commits structure. I can not review the first commit, as it is being fixed by the 3rd and 4th commit, while I can not review all commits together, because 2nd commit and 3rd commits contain non functional changes (refactoring). The 3rd commit is both fixing the first commit and it also contains a clean up of removing inefficient `Set<...> selectOutputs()` method.
   
   Could you restructure first three commits in the following way ([following our coding style](https://flink.apache.org/contributing/code-style-and-quality-pull-requests.html#separate-refactoring-cleanup-and-independent-changes)):
   
   1. my `[hotfix][task] Move output and collector helper classes out of OperatorChain` commit
   2. your introduction of `SelectedOutputsCollector` - a hotfix/optimisation of the pre-existing code
   3. your functional changes to the metrics counting (current 1st and part of the current 3rd commit)
   4. your `[hotfix] wrapping single RecordWriterOutput with RecordWriterCountingOutput`
   
   I'm not entirely sure if the 4th commit should be squashed with the 3rd or not, but that can be done easily afterwards if needed. What's currently causing me most problems while trying to review are the first three commits.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5928",
       "triggerID" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932",
       "triggerID" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "triggerType" : "PUSH"
     }, {
       "hash" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5951",
       "triggerID" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5960",
       "triggerID" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 87ec25bce258c9fc953084701dd3acf7d96ac9e2 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5951) 
   * 21aeb58971fb6408da21af37a4a1dfee35917484 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5960) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #13109: [FLINK-18808][runtime/metircs]]Task level numRecordsOut metric increases the output of non-end operators

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671817954


   Thanks for. the change @liming30 πŸ‘  I've left couple of comments mostly about tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5928",
       "triggerID" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932",
       "triggerID" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "triggerType" : "PUSH"
     }, {
       "hash" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5951",
       "triggerID" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5960",
       "triggerID" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 21aeb58971fb6408da21af37a4a1dfee35917484 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5960) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zentol closed pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
zentol closed pull request #13109:
URL: https://github.com/apache/flink/pull/13109


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liming30 commented on a change in pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
liming30 commented on a change in pull request #13109:
URL: https://github.com/apache/flink/pull/13109#discussion_r477235414



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
##########
@@ -36,31 +38,56 @@
 	@SuppressWarnings({"unchecked", "rawtypes"})
 	public CopyingDirectedOutput(
 			List<OutputSelector<OUT>> outputSelectors,
-			List<? extends Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
-		super(outputSelectors, outputs);
+			List<? extends Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge>> outputs,
+			Counter numRecordsOutForTask) {
+		super(outputSelectors, outputs, numRecordsOutForTask);
 	}
 
 	@Override
 	public void collect(StreamRecord<OUT> record) {
-		Set<Output<StreamRecord<OUT>>> selectedOutputs = selectOutputs(record);
+		Tuple2<Set<RecordWriterOutput<OUT>>, Set<Output<StreamRecord<OUT>>>> selectedOutputs = selectOutputs(record);

Review comment:
       Ok, I will modify this implementation by adding the `SelectedOutputsCollector` interface in the next submission.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liming30 commented on pull request #13109: [FLINK-18808][runtime/metircs]]Task level numRecordsOut metric increases the output of non-end operators

Posted by GitBox <gi...@apache.org>.
liming30 commented on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671942530


   @pnowojski Thank you for helping review the code, I have some questions added in the issue. When these problems are confirmed, I will start to fix the problem again.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13109: [FLINK-18808][runtime/metircs]]Task level numRecordsOut metric increases the output of non-end operators

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671368106


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 8c9ec1afe6f942c5642569b125cd1ed71d2c5839 (Mon Aug 10 13:54:18 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 775d504a5713a48cfc59c98b4ec26cde75a60002 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665) 
   * 76f5d0cbc997d1b0f6c47345900e09abed2f52bb UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13109:
URL: https://github.com/apache/flink/pull/13109#discussion_r475525041



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
##########
@@ -36,31 +38,56 @@
 	@SuppressWarnings({"unchecked", "rawtypes"})
 	public CopyingDirectedOutput(
 			List<OutputSelector<OUT>> outputSelectors,
-			List<? extends Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
-		super(outputSelectors, outputs);
+			List<? extends Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge>> outputs,
+			Counter numRecordsOutForTask) {
+		super(outputSelectors, outputs, numRecordsOutForTask);
 	}
 
 	@Override
 	public void collect(StreamRecord<OUT> record) {
-		Set<Output<StreamRecord<OUT>>> selectedOutputs = selectOutputs(record);
+		Tuple2<Set<RecordWriterOutput<OUT>>, Set<Output<StreamRecord<OUT>>>> selectedOutputs = selectOutputs(record);

Review comment:
       There are two things that I'm not sure about here.
   
   1. `Tuple2` I think is not the best option. For one thing 
   ```
   !selectedOutputs.f0.isEmpty()
   ``` or 
   ```protected Tuple2<Set<RecordWriterOutput<OUT>>, Set<Output<StreamRecord<OUT>>>> selectOutputs(StreamRecord<OUT> record)
   ``` 
   are not very readable. Also this is one more object allocation (`Tuple2`). I think it would be better to have two methods:
   ```
   Set<...> selectChainedOutputs(record) {...}
   Set<...> selectNonChainedOutputs(record) {...}
   ```
   
   2. Returning a new `Set<...>` from every call I think is also very expensive and unnecessary. This is a pre-existing problem, but maybe we could fix it? Instead of the `selectOutputs` we could have:
   
   ```
   void collectToChainedOutputs(StreamRecord<...> record, SelectedOutputsCollector<...> collector) {...}
   void selectNonChainedOutputs(StreamRecord<...> record, SelectedOutputsCollector<...> collector) {...}
   ```
   Where
   ```
   public interface SelectedOutputsCollector {
       void collect(Output<StreamRecord<...>> output, StreamRecord<...> record);
   }
   ```
   `SelectedOutputsCollector` would have a 4 versions (or maybe two versions + option to configure the other two? or maybe just a single implementation with 2 boolean flags?) for:
   
   - emitting without bumping `numRecordsOutForTask` without a copy
   - emitting without bumping `numRecordsOutForTask` with a copy
   - emitting with bumping `numRecordsOutForTask` without a copy
   - emitting with bumping `numRecordsOutForTask` with a copy
   
   I think it can make the code cleaner, while also saving on allocations. The one downside would be that we would loose the optimisation for avoiding shallow copying  for the last output, but that's just one extra allocation of `StreamRecord`. However that should be out-weighted by savings on `Set` allocations.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -739,40 +749,59 @@ public void collect(StreamRecord<T> record) {
 
 	static class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {

Review comment:
       Since you are modifying and increasing those classes in size, I would suggest to move them out of the `OperatoChain` (`OperatorChain` is already a bit too big). 
   
   I have already implemented just that as a part of another independent effort. Could you cherry pick this commit of mine:
   https://github.com/pnowojski/flink/commit/368de7dd0fb041e1231220d91434cc3a7a07f147
   
   and base your change on it?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -409,7 +410,7 @@ public OP getHeadOperator() {
 
 		if (selectors == null || selectors.isEmpty()) {
 			// simple path, no selector necessary
-			if (allOutputs.size() == 1) {
+			if (allOutputs.size() == 1 && !(allOutputs.get(0).f0 instanceof RecordWriterOutput)) {

Review comment:
       This change will force using `BroadcastingOutputCollector` always, which I would expect to have a performance impact. Why not to simply use `CountingOutput` and/or reuse operator level metric in this case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-716544838


   Hi @liming30, as https://github.com/apache/flink/pull/13343 has been merged, do you want to pick up this work?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liming30 commented on a change in pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
liming30 commented on a change in pull request #13109:
URL: https://github.com/apache/flink/pull/13109#discussion_r478836972



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputsCollector.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Map;
+
+/**
+ * The selected outputs collector will send records to the default output,
+ * and output matching outputNames.
+ *
+ * @param <T> The type of the elements that can be emitted.
+ */
+public class DirectedOutputsCollector<T> implements SelectedOutputsCollector<T> {
+
+	private final Output<StreamRecord<T>>[] selectAllOutputs;
+	private final Map<String, Output<StreamRecord<T>>[]> outputMap;
+
+	public DirectedOutputsCollector(
+		Output<StreamRecord<T>>[] selectAllOutputs,
+		Map<String, Output<StreamRecord<T>>[]> outputMap) {
+		this.selectAllOutputs = selectAllOutputs;
+		this.outputMap = outputMap;
+	}
+
+	@Override
+	public boolean collect(Iterable<String> outputNames, StreamRecord<T> record) {
+		boolean emitted = false;
+
+		if (selectAllOutputs.length > 0) {
+			collect(selectAllOutputs, record);
+			emitted = true;
+		}
+
+		for (String outputName : outputNames) {
+			Output<StreamRecord<T>>[] outputList = outputMap.get(outputName);
+			if (outputList != null && outputList.length > 0) {
+				collect(outputList, record);
+				emitted = true;
+			}
+		}

Review comment:
       In the old implementation via `set`, even if the same `output` appears multiple times in `outputNames`, it will only be sent once. Now it will send multiple times and I am not sure if this behavior is correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5928",
       "triggerID" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932",
       "triggerID" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "triggerType" : "PUSH"
     }, {
       "hash" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5951",
       "triggerID" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3d854c62355f9049062a7ae6a908dcceecd9c213 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932) 
   * 87ec25bce258c9fc953084701dd3acf7d96ac9e2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5951) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13109: [FLINK-18808][runtime/metircs]]Task level numRecordsOut metric increases the output of non-end operators

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13109:
URL: https://github.com/apache/flink/pull/13109#discussion_r468594920



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
##########
@@ -160,7 +160,7 @@ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<S
 			if (config.isChainStart()) {
 				operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask();
 			}
-			if (config.isChainEnd()) {
+			if (!config.getNonChainedOutputs(getUserCodeClassloader()).isEmpty()) {

Review comment:
       You've missed the same change but for `AbstractStreamOperatorV2` (I would expect `MultipleInputStreamTaskTest#testOperatorMetricReuse` would cover for that)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13109:
URL: https://github.com/apache/flink/pull/13109#discussion_r484508922



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputsCollector.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Map;
+
+/**
+ * The selected outputs collector will send records to the default output,
+ * and output matching outputNames.
+ *
+ * @param <T> The type of the elements that can be emitted.
+ */
+public class DirectedOutputsCollector<T> implements SelectedOutputsCollector<T> {
+
+	private final Output<StreamRecord<T>>[] selectAllOutputs;
+	private final Map<String, Output<StreamRecord<T>>[]> outputMap;
+
+	public DirectedOutputsCollector(
+		Output<StreamRecord<T>>[] selectAllOutputs,
+		Map<String, Output<StreamRecord<T>>[]> outputMap) {
+		this.selectAllOutputs = selectAllOutputs;
+		this.outputMap = outputMap;
+	}
+
+	@Override
+	public boolean collect(Iterable<String> outputNames, StreamRecord<T> record) {
+		boolean emitted = false;
+
+		if (selectAllOutputs.length > 0) {
+			collect(selectAllOutputs, record);
+			emitted = true;
+		}
+
+		for (String outputName : outputNames) {
+			Output<StreamRecord<T>>[] outputList = outputMap.get(outputName);
+			if (outputList != null && outputList.length > 0) {
+				collect(outputList, record);
+				emitted = true;
+			}
+		}

Review comment:
       I'm trying to figure this out.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5928",
       "triggerID" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932",
       "triggerID" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "triggerType" : "PUSH"
     }, {
       "hash" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5951",
       "triggerID" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5960",
       "triggerID" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3a7e71c54243a063412da38da4be35c4f3b2d175",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6109",
       "triggerID" : "3a7e71c54243a063412da38da4be35c4f3b2d175",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3a7e71c54243a063412da38da4be35c4f3b2d175 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6109) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-688690309


   @liming30 I have a good news and a bad news. Good news, there is no problem with `OutputSelector`. Bad news, is that we have wasted quite a bit of time trying to fix/address a problem in a components that @aljoscha and @dawidwys are just about to remove https://github.com/apache/flink/pull/13343 ...
   
   Sorry for the confusion, I need to take a deeper look at the https://github.com/apache/flink/pull/13343 PR, but probably we should re-implement this change (fixing `numRecordsOut` metric) on top of theirs work, as it looks like it will simplify the code by quite a lot...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13109: [FLINK-18808][runtime/metircs]]Task level numRecordsOut metric increases the output of non-end operators

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13109:
URL: https://github.com/apache/flink/pull/13109#discussion_r468389517



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
##########
@@ -160,7 +160,7 @@ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<S
 			if (config.isChainStart()) {

Review comment:
       Can you rename the commit to:
   ```
   [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.
   
   Before side outputs were ignored by the numRecordsOut metric.
   ```
   ?

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
##########
@@ -41,13 +41,17 @@
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;

Review comment:
       If you are adding the test for both one and two inputs, I think you should also modify the `MultipleInputStreamTaskTest#testOperatorMetricReuse` test as well.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
##########
@@ -121,6 +124,91 @@ private void head(OperatorID headOperatorID) {
 			TypeSerializer<IN> inputSerializer,
 			TypeSerializer<OUT> outputSerializer,
 			boolean createKeyedStateBackend) {
+		return chain(operatorID,
+			operatorFactory,
+			inputSerializer,
+			outputSerializer,
+			createKeyedStateBackend,
+			null);
+	}
+
+	public <T> StreamConfigChainer<OWNER> chainWithNonChainedOutputs(
+		OperatorID operatorID,

Review comment:
       nit: (in many places) according to our [coding style](https://flink.apache.org/contributing/code-style-and-quality-formatting.html#breaking-the-lines-of-too-long-statements) parameters should be double intended 

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
##########
@@ -121,6 +124,91 @@ private void head(OperatorID headOperatorID) {
 			TypeSerializer<IN> inputSerializer,
 			TypeSerializer<OUT> outputSerializer,
 			boolean createKeyedStateBackend) {
+		return chain(operatorID,
+			operatorFactory,
+			inputSerializer,
+			outputSerializer,
+			createKeyedStateBackend,
+			null);
+	}
+
+	public <T> StreamConfigChainer<OWNER> chainWithNonChainedOutputs(
+		OperatorID operatorID,
+		OneInputStreamOperator<T, T> operator,
+		TypeSerializer<T> typeSerializer,
+		boolean createKeyedStateBackend,
+		List<StreamEdge> nonChainedOutputs) {
+		return chainWithNonChainedOutputs(operatorID, operator, typeSerializer, typeSerializer, createKeyedStateBackend, nonChainedOutputs);
+	}
+
+	public <T> StreamConfigChainer<OWNER> chainWithNonChainedOutputs(
+		OneInputStreamOperator<T, T> operator,
+		TypeSerializer<T> typeSerializer,
+		List<StreamEdge> nonChainedOutputs) {
+		return chainWithNonChainedOutputs(new OperatorID(), operator, typeSerializer, nonChainedOutputs);
+	}
+
+	public <T> StreamConfigChainer<OWNER> chainWithNonChainedOutputs(
+		OperatorID operatorID,
+		OneInputStreamOperator<T, T> operator,
+		TypeSerializer<T> typeSerializer,
+		List<StreamEdge> nonChainedOutputs) {
+		return chainWithNonChainedOutputs(operatorID, operator, typeSerializer, typeSerializer, false, nonChainedOutputs);
+	}
+
+	public <T> StreamConfigChainer<OWNER> chainWithNonChainedOutputs(
+		OneInputStreamOperatorFactory<T, T> operatorFactory,
+		TypeSerializer<T> typeSerializer,
+		List<StreamEdge> nonChainedOutputs) {
+		return chainWithNonChainedOutputs(new OperatorID(), operatorFactory, typeSerializer, nonChainedOutputs);
+	}
+
+	public <T> StreamConfigChainer<OWNER> chainWithNonChainedOutputs(
+		OperatorID operatorID,
+		OneInputStreamOperatorFactory<T, T> operatorFactory,
+		TypeSerializer<T> typeSerializer,
+		List<StreamEdge> nonChainedOutputs) {
+		return chainWithNonChainedOutputs(operatorID, operatorFactory, typeSerializer, typeSerializer, false, nonChainedOutputs);
+	}
+
+	private <IN, OUT> StreamConfigChainer<OWNER> chainWithNonChainedOutputs(

Review comment:
       I think this builder class is growing a bit too big with too many overloaded methods. I think we need to split it into two step builder pattern. 
   
   Instead of providing many overloaded variants of `chain` and `chainWithNonChainedOutputs` (and maybe more in the future), I think it would be better to have a single `chain` method with just the obligatory parameters
   ```			
   StreamConfigEdgeChainer chain(TypeSerializer<T> typeSerializer);
   ```
   (?)
   that returns `StreamConfigEdgeChainer` (different name?). `StreamConfigEdgeChainer` would be the 2nd level builder allowing to set more optional parameters like operator factory, nonChainedOutputs, createKeyedStateBackend, .....
   
   After calling `StreamConfigEdgeChainer.build()` it would return again the first level builder `StreamConfigChainer`.
   
   
   ```
   testHarness
     .chain(typeSerializer1)
       .setOperatorID(...)
       .setOutputTypeSerializer(...)
       .build()
     .chain(typeSerializer2)
       .setNonChainedOutputsCount(2)
       .build()
     .finish();
   ```
   WDYT?
   
   (for the sake of backward compatbility the existing `chain` methods could be left as they are (`@Deprecated`), so that we don't have to modify all of the existing tests.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
##########
@@ -648,8 +649,16 @@ public void close() throws Exception {
 	public void testOperatorMetricReuse() throws Exception {
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
 
+		StreamEdge edge = new StreamEdge(
+			new StreamNode(2, null, null, (StreamOperator<?>) null, null, null, null),
+			new StreamNode(4 , null, null, (StreamOperator<?>) null, null, null, null),
+			0,
+			Collections.<String>emptyList(),
+			new BroadcastPartitioner<Object>(),
+			null);
+

Review comment:
       The whole idea of `StreamConfigChainer` is to avoid those scary manual creations of `StreamEdges` - this edge should be created automatically by the `StreamConfigChainer`.
   
   For example user of the `StreamConfigChainer` would just specify how many non chained ouput edges there should be, and the `StreamConfigChainer` would create those edges automatically.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5928",
       "triggerID" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932",
       "triggerID" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "triggerType" : "PUSH"
     }, {
       "hash" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5951",
       "triggerID" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3d854c62355f9049062a7ae6a908dcceecd9c213 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932) 
   * 87ec25bce258c9fc953084701dd3acf7d96ac9e2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5951) 
   * 21aeb58971fb6408da21af37a4a1dfee35917484 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liming30 commented on a change in pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
liming30 commented on a change in pull request #13109:
URL: https://github.com/apache/flink/pull/13109#discussion_r478431177



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/SelectedOutputsCollectorImpl.java
##########
@@ -0,0 +1,61 @@
+package org.apache.flink.streaming.api.collector.selector;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Map;
+
+/**
+ * The selected outputs collector will send records to the default output,
+ * and output matching outputNames.
+ *
+ * @param <T> The type of the elements that can be emitted.
+ */
+public class SelectedOutputsCollectorImpl<T> implements SelectedOutputsCollector<T> {
+
+	private final Output<StreamRecord<T>>[] selectAllOutputs;
+	private final Map<String, Output<StreamRecord<T>>[]> outputMap;
+
+	private final boolean objectReuse;
+
+	public SelectedOutputsCollectorImpl(
+		Output<StreamRecord<T>>[] selectAllOutputs,
+		Map<String, Output<StreamRecord<T>>[]> outputMap,
+		boolean objectReuse) {
+		this.selectAllOutputs = selectAllOutputs;
+		this.outputMap = outputMap;
+		this.objectReuse = objectReuse;
+	}
+
+	@Override
+	public boolean collect(Iterable<String> outputNames, StreamRecord<T> record) {
+		boolean emitted = false;
+
+		if (selectAllOutputs.length > 0) {
+			collect(selectAllOutputs, record);
+			emitted = true;
+		}
+
+		for (String outputName : outputNames) {
+			Output<StreamRecord<T>>[] outputList = outputMap.get(outputName);
+			if (outputList != null && outputList.length > 0) {
+				collect(outputList, record);
+				emitted = true;
+			}
+		}

Review comment:
       In the old implementation via `set`, even if the same `output` appears multiple times in `outputNames`, it will only be sent once. Now it will send multiple times and I am not sure if this behavior is correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metircs]]Task level numRecordsOut metric increases the output of non-end operators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8c9ec1afe6f942c5642569b125cd1ed71d2c5839 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liming30 commented on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
liming30 commented on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-687535459


   Hi, @pnowojski I have rebase the master and reorganized the commit. 
   
   > Could you restructure first three commits in the following way ([following our coding style](https://flink.apache.org/contributing/code-style-and-quality-pull-requests.html#separate-refactoring-cleanup-and-independent-changes)):
   > 
   > 1. my `[hotfix][task] Move output and collector helper classes out of OperatorChain` commit
   >
   The first commit has been merged into the master, so it is removed from this pr.
   > 2. your introduction of `SelectedOutputsCollector` - a hotfix/optimisation of the pre-existing code
   > 3. your functional changes to the metrics counting (current 1st and part of the current 3rd commit)
   > 4. your `[hotfix] wrapping single RecordWriterOutput with RecordWriterCountingOutput`
   >
   Only two commits are retained. The first commit is the introduction of `SelectedOutputsCollector`, and the second commit is a modification of the metrics counting function.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5928",
       "triggerID" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932",
       "triggerID" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3d854c62355f9049062a7ae6a908dcceecd9c213 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13109: [FLINK-18808][runtime/metircs]]Task level numRecordsOut metric increases the output of non-end operators

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8c9ec1afe6f942c5642569b125cd1ed71d2c5839 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13109:
URL: https://github.com/apache/flink/pull/13109#issuecomment-671381707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5366",
       "triggerID" : "8c9ec1afe6f942c5642569b125cd1ed71d2c5839",
       "triggerType" : "PUSH"
     }, {
       "hash" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5665",
       "triggerID" : "775d504a5713a48cfc59c98b4ec26cde75a60002",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5928",
       "triggerID" : "76f5d0cbc997d1b0f6c47345900e09abed2f52bb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5932",
       "triggerID" : "3d854c62355f9049062a7ae6a908dcceecd9c213",
       "triggerType" : "PUSH"
     }, {
       "hash" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5951",
       "triggerID" : "87ec25bce258c9fc953084701dd3acf7d96ac9e2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5960",
       "triggerID" : "21aeb58971fb6408da21af37a4a1dfee35917484",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3a7e71c54243a063412da38da4be35c4f3b2d175",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6109",
       "triggerID" : "3a7e71c54243a063412da38da4be35c4f3b2d175",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b43e9eda0e94696b0563922ecb37b202bca47af5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6222",
       "triggerID" : "b43e9eda0e94696b0563922ecb37b202bca47af5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3a7e71c54243a063412da38da4be35c4f3b2d175 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6109) 
   * b43e9eda0e94696b0563922ecb37b202bca47af5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6222) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org