You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/24 16:29:17 UTC

[GitHub] [flink] pnowojski opened a new pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

pnowojski opened a new pull request #13234:
URL: https://github.com/apache/flink/pull/13234


   This PR adds a possibility to chain `SourceOperator` with `MultipleInputStreamTask`. Currently checkpointing, watermarks or metrics are not tested (part of another ticket).
   
   ## Brief change log
   
   Please check individual commit messages.
   
   ## Verifying this change
   This change added `MultipleInputStreamTaskChainedSourcesTest` for the new feature.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r478496041



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
+	private static final Logger LOG = LoggerFactory.getLogger(MultipleInputChainingOutput.class);
+
+	protected final Input<T> input;
+	protected final Counter numRecordsIn;
+	protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
+	protected final StreamStatusProvider streamStatusProvider;
+	@Nullable protected final OutputTag<T> outputTag;
+
+	public MultipleInputChainingOutput(
+			Input<T> input,
+			OperatorMetricGroup operatorMetricGroup,
+			StreamStatusProvider streamStatusProvider,
+			@Nullable OutputTag<T> outputTag) {
+		this.input = input;
+
+		{
+			Counter tmpNumRecordsIn;
+			try {
+				OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup.getIOMetricGroup();
+				tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
+			} catch (Exception e) {
+				LOG.warn("An exception occurred during the metrics setup.", e);
+				tmpNumRecordsIn = new SimpleCounter();
+			}
+			numRecordsIn = tmpNumRecordsIn;
+		}
+
+		this.streamStatusProvider = streamStatusProvider;
+		this.outputTag = outputTag;
+	}
+
+	@Override
+	public void collect(StreamRecord<T> record) {
+		if (this.outputTag != null) {
+			// we are not responsible for emitting to the main output.
+			return;
+		}
+
+		pushToOperator(record);
+	}
+
+	@Override
+	public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
+		if (this.outputTag == null || !this.outputTag.equals(outputTag)) {

Review comment:
       In that case user should be calling `Output.collect(record)`, as `outputTag` is `@NonNull` (by default).
   
   I mean, we could do it but do we want to support nullable `outputTag`? Keep in mind that's a public api, so we would need to change the behaviour of the public api and then keep supporting incorrect calls in the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r478510256



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -90,16 +91,19 @@
 
 	private final RecordWriterOutput<?>[] streamOutputs;
 
-	private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint;
+	private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput;
 
 	/**
 	 * For iteration, {@link StreamIterationHead} and {@link StreamIterationTail} used for executing
-	 * feedback edges do not contain any operators, in which case, {@code headOperatorWrapper} and
+	 * feedback edges do not contain any operators, in which case, {@code mainOperatorWrapper} and
 	 * {@code tailOperatorWrapper} are null.
 	 */
-	@Nullable private final StreamOperatorWrapper<OUT, OP> headOperatorWrapper;
+	@Nullable private final StreamOperatorWrapper<OUT, OP> mainOperatorWrapper;
+	@Nullable private final StreamOperatorWrapper<?, ?> firstOperatorWrapper;

Review comment:
       Nope. Main operator is as explained in the java doc above. First operator can be either it, or some chained source operator (if present) - added a java doc with this explanation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5847",
       "triggerID" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5890",
       "triggerID" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5892",
       "triggerID" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2382ecb7dcb679dddbc39f44717ed1c4c7c061cf",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5935",
       "triggerID" : "2382ecb7dcb679dddbc39f44717ed1c4c7c061cf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36d937d37f057632e795e7a7f4c477641bfb6c1e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6100",
       "triggerID" : "36d937d37f057632e795e7a7f4c477641bfb6c1e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e5a866429bf22eb6aeeb26733e5ab1d705780e66 UNKNOWN
   * 2382ecb7dcb679dddbc39f44717ed1c4c7c061cf Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5935) 
   * 36d937d37f057632e795e7a7f4c477641bfb6c1e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6100) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r494813570



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -53,42 +53,48 @@ public void init() throws Exception {
 		StreamConfig configuration = getConfiguration();
 		ClassLoader userClassLoader = getUserCodeClassLoader();
 
-		TypeSerializer<?>[] inputDeserializers = configuration.getTypeSerializersIn(userClassLoader);
+		StreamConfig.Input[] inputs = configuration.getInputs(userClassLoader);
 
-		ArrayList<IndexedInputGate>[] inputLists = new ArrayList[inputDeserializers.length];
-		WatermarkGauge[] watermarkGauges = new WatermarkGauge[inputDeserializers.length];
+		ArrayList<IndexedInputGate>[] inputLists = new ArrayList[
+			(int) Arrays.stream(inputs)
+				.filter(input -> (input instanceof StreamConfig.NetworkInput))
+				.count()];

Review comment:
       @rkhachatryan updated answer to your question:
   
   https://issues.apache.org/jira/browse/FLINK-19411
   
   no it isn't the same :( `configuration.getNumberOfNetworkInputs` returns the number of input gates of the task, ignoring if they are unioned or not.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r494813570



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -53,42 +53,48 @@ public void init() throws Exception {
 		StreamConfig configuration = getConfiguration();
 		ClassLoader userClassLoader = getUserCodeClassLoader();
 
-		TypeSerializer<?>[] inputDeserializers = configuration.getTypeSerializersIn(userClassLoader);
+		StreamConfig.Input[] inputs = configuration.getInputs(userClassLoader);
 
-		ArrayList<IndexedInputGate>[] inputLists = new ArrayList[inputDeserializers.length];
-		WatermarkGauge[] watermarkGauges = new WatermarkGauge[inputDeserializers.length];
+		ArrayList<IndexedInputGate>[] inputLists = new ArrayList[
+			(int) Arrays.stream(inputs)
+				.filter(input -> (input instanceof StreamConfig.NetworkInput))
+				.count()];

Review comment:
       @rkhachatryan updated answer to your question:
   
   https://issues.apache.org/jira/browse/FLINK-19411
   
   no it isn't the same :( `configuration.getNumberOfNetworkInputs` returns the number of input gates of the task, ignoring if they are unioned or not.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -53,42 +53,48 @@ public void init() throws Exception {
 		StreamConfig configuration = getConfiguration();
 		ClassLoader userClassLoader = getUserCodeClassLoader();
 
-		TypeSerializer<?>[] inputDeserializers = configuration.getTypeSerializersIn(userClassLoader);
+		StreamConfig.Input[] inputs = configuration.getInputs(userClassLoader);
 
-		ArrayList<IndexedInputGate>[] inputLists = new ArrayList[inputDeserializers.length];
-		WatermarkGauge[] watermarkGauges = new WatermarkGauge[inputDeserializers.length];
+		ArrayList<IndexedInputGate>[] inputLists = new ArrayList[
+			(int) Arrays.stream(inputs)
+				.filter(input -> (input instanceof StreamConfig.NetworkInput))
+				.count()];

Review comment:
       Updated answer @rkhachatryan, [no it's not the same](https://issues.apache.org/jira/browse/FLINK-19411) :(




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5847",
       "triggerID" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5890",
       "triggerID" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5892",
       "triggerID" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 705b359ef261cf3c6b1e42c418e7955c90007171 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5890) 
   * e5a866429bf22eb6aeeb26733e5ab1d705780e66 UNKNOWN
   * 1b6142f9dad4b3893629b2ec774c93adb5875e84 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5892) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r482286576



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {

Review comment:
       Great, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479136421



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -90,16 +91,19 @@
 
 	private final RecordWriterOutput<?>[] streamOutputs;
 
-	private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint;
+	private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput;
 
 	/**
 	 * For iteration, {@link StreamIterationHead} and {@link StreamIterationTail} used for executing
-	 * feedback edges do not contain any operators, in which case, {@code headOperatorWrapper} and
+	 * feedback edges do not contain any operators, in which case, {@code mainOperatorWrapper} and
 	 * {@code tailOperatorWrapper} are null.
 	 */
-	@Nullable private final StreamOperatorWrapper<OUT, OP> headOperatorWrapper;
+	@Nullable private final StreamOperatorWrapper<OUT, OP> mainOperatorWrapper;
+	@Nullable private final StreamOperatorWrapper<?, ?> firstOperatorWrapper;

Review comment:
       I'm confused :)
   
   Is it something like this:
   ```
   first
         \ main (multi-input) -> ... -> tail
         /
   second
   ```
   ?
   If yes, how would wrappers be linked? Will `firstOperatorWrapper.close` also close the second operator?
   
   nit: would be nice to have that kind of diagram in code too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679234288


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50 (Mon Aug 24 16:31:35 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479150979



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -208,19 +212,97 @@ public OperatorChain(
 	OperatorChain(
 			List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
 			RecordWriterOutput<?>[] streamOutputs,
-			WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint,
-			StreamOperatorWrapper<OUT, OP> headOperatorWrapper) {
+			WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput,
+			StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) {
 
 		this.streamOutputs = checkNotNull(streamOutputs);
-		this.chainEntryPoint = checkNotNull(chainEntryPoint);
+		this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
 		this.operatorEventDispatcher = null;
 
 		checkState(allOperatorWrappers != null && allOperatorWrappers.size() > 0);
-		this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+		this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
 		this.tailOperatorWrapper = allOperatorWrappers.get(0);
 		this.numOperators = allOperatorWrappers.size();
+		this.chainedSources = Collections.emptyMap();
+
+		firstOperatorWrapper = linkOperatorWrappers(allOperatorWrappers);
+	}
+
+	private void createChainOutputs(
+			List<StreamEdge> outEdgesInOrder,
+			RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
+			Map<Integer, StreamConfig> chainedConfigs,
+			StreamTask<OUT, OP> containingTask,
+			Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {
+		for (int i = 0; i < outEdgesInOrder.size(); i++) {
+			StreamEdge outEdge = outEdgesInOrder.get(i);
+
+			RecordWriterOutput<?> streamOutput = createStreamOutput(
+				recordWriterDelegate.getRecordWriter(i),
+				outEdge,
+				chainedConfigs.get(outEdge.getSourceId()),
+				containingTask.getEnvironment());
+
+			this.streamOutputs[i] = streamOutput;
+			streamOutputMap.put(outEdge, streamOutput);
+		}
+	}
+
+	private Map<SourceInputConfig, ChainedSource> createChainedInputs(

Review comment:
       This method as well as `createChainedInputs` is full of compiler warnings too - can you fix them please?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r478509078



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -208,19 +206,100 @@ public OperatorChain(
 	OperatorChain(
 			List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
 			RecordWriterOutput<?>[] streamOutputs,
-			WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint,
-			StreamOperatorWrapper<OUT, OP> headOperatorWrapper) {
+			WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput,
+			StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) {
 
 		this.streamOutputs = checkNotNull(streamOutputs);
-		this.chainEntryPoint = checkNotNull(chainEntryPoint);
+		this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
 		this.operatorEventDispatcher = null;
 
 		checkState(allOperatorWrappers != null && allOperatorWrappers.size() > 0);
-		this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+		this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
 		this.tailOperatorWrapper = allOperatorWrappers.get(0);
 		this.numOperators = allOperatorWrappers.size();
+		this.chainedSourceOutputs = new ChainedSourceOutputs();
 
-		linkOperatorWrappers(allOperatorWrappers);
+		firstOperatorWrapper = linkOperatorWrappers(allOperatorWrappers);
+	}
+
+	private void createChainOutputs(
+		List<StreamEdge> outEdgesInOrder,
+		RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
+		Map<Integer, StreamConfig> chainedConfigs,
+		StreamTask<OUT, OP> containingTask,
+		Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {
+		for (int i = 0; i < outEdgesInOrder.size(); i++) {
+			StreamEdge outEdge = outEdgesInOrder.get(i);
+
+			RecordWriterOutput<?> streamOutput = createStreamOutput(
+				recordWriterDelegate.getRecordWriter(i),
+				outEdge,
+				chainedConfigs.get(outEdge.getSourceId()),
+				containingTask.getEnvironment());
+
+			this.streamOutputs[i] = streamOutput;
+			streamOutputMap.put(outEdge, streamOutput);
+		}
+	}
+
+	private ChainedSourceOutputs createChainedInputs(
+			StreamTask<OUT, OP> containingTask,
+			StreamConfig.Input[] configuredInputs,
+			Map<Integer, StreamConfig> chainedConfigs,
+			ClassLoader userCodeClassloader,
+			List<StreamOperatorWrapper<?, ?>> allOpWrappers) {
+		if (Arrays.stream(configuredInputs).noneMatch(input -> input instanceof SourceInput)) {
+			return new ChainedSourceOutputs();
+		}
+		checkState(
+			mainOperatorWrapper.getStreamOperator() instanceof MultipleInputStreamOperator,

Review comment:
       It is, but it would be an unsupported configuration. If there are chained sources, it's currently only supported with `MultipleInputStreamOperator` - not with for example `TwoInputStreamOperator`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479061227



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
+	private static final Logger LOG = LoggerFactory.getLogger(MultipleInputChainingOutput.class);
+
+	protected final Input<T> input;
+	protected final Counter numRecordsIn;
+	protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
+	protected final StreamStatusProvider streamStatusProvider;
+	@Nullable protected final OutputTag<T> outputTag;
+
+	public MultipleInputChainingOutput(
+			Input<T> input,
+			OperatorMetricGroup operatorMetricGroup,
+			StreamStatusProvider streamStatusProvider,
+			@Nullable OutputTag<T> outputTag) {
+		this.input = input;
+
+		{
+			Counter tmpNumRecordsIn;
+			try {
+				OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup.getIOMetricGroup();
+				tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
+			} catch (Exception e) {
+				LOG.warn("An exception occurred during the metrics setup.", e);
+				tmpNumRecordsIn = new SimpleCounter();
+			}
+			numRecordsIn = tmpNumRecordsIn;
+		}
+
+		this.streamStatusProvider = streamStatusProvider;
+		this.outputTag = outputTag;
+	}
+
+	@Override
+	public void collect(StreamRecord<T> record) {
+		if (this.outputTag != null) {
+			// we are not responsible for emitting to the main output.
+			return;
+		}
+
+		pushToOperator(record);
+	}
+
+	@Override
+	public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
+		if (this.outputTag == null || !this.outputTag.equals(outputTag)) {

Review comment:
       It's unclear to me what is the contract and the intended behavior.
   If
   > outputTag is @NonNull (by default)
   
   then we probably should check for it.
   Currently, we'll just drop the record.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r482015795



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {

Review comment:
       Done. Please take a look at the commits:
   
   - [FLINK-18905][task/datastream] Convert OneInputStreamOperator to Input
   - fixup! [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479115224



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -208,19 +206,100 @@ public OperatorChain(
 	OperatorChain(
 			List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
 			RecordWriterOutput<?>[] streamOutputs,
-			WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint,
-			StreamOperatorWrapper<OUT, OP> headOperatorWrapper) {
+			WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput,
+			StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) {
 
 		this.streamOutputs = checkNotNull(streamOutputs);
-		this.chainEntryPoint = checkNotNull(chainEntryPoint);
+		this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
 		this.operatorEventDispatcher = null;
 
 		checkState(allOperatorWrappers != null && allOperatorWrappers.size() > 0);
-		this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+		this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
 		this.tailOperatorWrapper = allOperatorWrappers.get(0);
 		this.numOperators = allOperatorWrappers.size();
+		this.chainedSourceOutputs = new ChainedSourceOutputs();
 
-		linkOperatorWrappers(allOperatorWrappers);
+		firstOperatorWrapper = linkOperatorWrappers(allOperatorWrappers);
+	}
+
+	private void createChainOutputs(
+		List<StreamEdge> outEdgesInOrder,
+		RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
+		Map<Integer, StreamConfig> chainedConfigs,
+		StreamTask<OUT, OP> containingTask,
+		Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {
+		for (int i = 0; i < outEdgesInOrder.size(); i++) {
+			StreamEdge outEdge = outEdgesInOrder.get(i);
+
+			RecordWriterOutput<?> streamOutput = createStreamOutput(
+				recordWriterDelegate.getRecordWriter(i),
+				outEdge,
+				chainedConfigs.get(outEdge.getSourceId()),
+				containingTask.getEnvironment());
+
+			this.streamOutputs[i] = streamOutput;
+			streamOutputMap.put(outEdge, streamOutput);
+		}
+	}
+
+	private ChainedSourceOutputs createChainedInputs(
+			StreamTask<OUT, OP> containingTask,
+			StreamConfig.Input[] configuredInputs,
+			Map<Integer, StreamConfig> chainedConfigs,
+			ClassLoader userCodeClassloader,
+			List<StreamOperatorWrapper<?, ?>> allOpWrappers) {
+		if (Arrays.stream(configuredInputs).noneMatch(input -> input instanceof SourceInput)) {
+			return new ChainedSourceOutputs();
+		}
+		checkState(
+			mainOperatorWrapper.getStreamOperator() instanceof MultipleInputStreamOperator,

Review comment:
       Makes sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-686385804


   Thanks, I've extended `MultipleInputStreamTaskChainedSourcesTest#testLifeCycleOrder` as we discussed offline. Will merge with green azure.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r481282932



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {

Review comment:
       > (I think the delegation overhead will be eliminated by JVM, but it can be tested).
   
   Would it? JVM is terrible with callsite optimisations, so I think this would add another virtual call.
   
   But let me try out unifying `Input` with `OneInputStreamOperator`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5847",
       "triggerID" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9063c7aff55eee4e446dddff9d74e83ab692f3d4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5847) 
   * 705b359ef261cf3c6b1e42c418e7955c90007171 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r494824026



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -53,42 +53,48 @@ public void init() throws Exception {
 		StreamConfig configuration = getConfiguration();
 		ClassLoader userClassLoader = getUserCodeClassLoader();
 
-		TypeSerializer<?>[] inputDeserializers = configuration.getTypeSerializersIn(userClassLoader);
+		StreamConfig.Input[] inputs = configuration.getInputs(userClassLoader);
 
-		ArrayList<IndexedInputGate>[] inputLists = new ArrayList[inputDeserializers.length];
-		WatermarkGauge[] watermarkGauges = new WatermarkGauge[inputDeserializers.length];
+		ArrayList<IndexedInputGate>[] inputLists = new ArrayList[
+			(int) Arrays.stream(inputs)
+				.filter(input -> (input instanceof StreamConfig.NetworkInput))
+				.count()];

Review comment:
       Updated answer @rkhachatryan, [no it's not the same](https://issues.apache.org/jira/browse/FLINK-19411) :(




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-685694228


   Done, @rkhachatryan can you take a look again?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828) 
   * 9063c7aff55eee4e446dddff9d74e83ab692f3d4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r478490967



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -160,81 +159,84 @@ public TimeCharacteristic getTimeCharacteristic() {
 		}
 	}
 
-	public void setTypeSerializersIn(TypeSerializer<?> ...serializers) {
-		config.setInteger(TYPE_SERIALIZERS_IN_COUNT, serializers.length);
-		for (int i = 0; i < serializers.length; i++) {
-			setTypeSerializer(String.format(TYPE_SERIALIZERS_IN_PATTERN, i), serializers[i]);
-		}
-	}
-
 	public void setTypeSerializerOut(TypeSerializer<?> serializer) {
 		setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
 	}
 
+	public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
+		try {
+			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
+		} catch (Exception e) {
+			throw new StreamTaskException("Could not instantiate serializer.", e);
+		}
+	}
+
 	public void setTypeSerializerSideOut(OutputTag<?> outputTag, TypeSerializer<?> serializer) {
 		setTypeSerializer(TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), serializer);
 	}
 
-	@Deprecated
-	public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
-		return getTypeSerializerIn(0, cl);
+	private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) {
+		try {
+			InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key);
+		} catch (IOException e) {
+			throw new StreamTaskException("Could not serialize type serializer.", e);
+		}
 	}
 
-	@Deprecated
-	public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
-		return getTypeSerializerIn(1, cl);
+	public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> outputTag, ClassLoader cl) {
+		Preconditions.checkNotNull(outputTag, "Side output id must not be null.");
+		try {
+			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl);
+		} catch (Exception e) {
+			throw new StreamTaskException("Could not instantiate serializer.", e);
+		}
 	}
 
-	public TypeSerializer<?>[] getTypeSerializersIn(ClassLoader cl) {
-		int typeSerializersCount = config.getInteger(TYPE_SERIALIZERS_IN_COUNT, -1);
-		checkState(
-			typeSerializersCount >= 0,
-			"Missing value for %s in the config? [%d]",
-			TYPE_SERIALIZERS_IN_COUNT,
-			typeSerializersCount);
-		TypeSerializer<?>[] typeSerializers = new TypeSerializer<?>[typeSerializersCount];
-		for (int i = 0; i < typeSerializers.length; i++) {
-			typeSerializers[i] = getTypeSerializerIn(i, cl);
+	public void setTypeSerializersIn(TypeSerializer<?> ...serializers) {
+		Input[] inputs = new Input[serializers.length];
+		for (int i = 0; i < serializers.length; i++) {
+			inputs[i] = new NetworkInput(serializers[i], i);
 		}
-		return typeSerializers;
+		setInputs(inputs);
 	}
 
-	public <T> TypeSerializer<T> getTypeSerializerIn(int index, ClassLoader cl) {
+	public void setInputs(Input ...inputs) {
 		try {
-			return InstantiationUtil.readObjectFromConfig(
-				this.config,
-				String.format(TYPE_SERIALIZERS_IN_PATTERN, index),
-				cl);
-		} catch (Exception e) {
-			throw new StreamTaskException(
-				String.format("Could not instantiate serializer for [%d] input.", index),
-				e);
+			InstantiationUtil.writeObjectToConfig(inputs, this.config, INPUTS);
+		} catch (IOException e) {
+			throw new StreamTaskException("Could not serialize inputs.", e);
 		}
 	}
 
-	public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
+	public Input[] getInputs(ClassLoader cl) {
 		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
+			Input[] inputs = InstantiationUtil.readObjectFromConfig(this.config, INPUTS, cl);
+			if (inputs == null) {
+				return new Input[0];
+			}
+			return inputs;
 		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate serializer.", e);
+			throw new StreamTaskException("Could not deserialize inputs", e);
 		}
 	}
 
-	public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> outputTag, ClassLoader cl) {
-		Preconditions.checkNotNull(outputTag, "Side output id must not be null.");
-		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl);
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate serializer.", e);
-		}
+	@Deprecated
+	public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
+		return getTypeSerializerIn(0, cl);
 	}
 
-	private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) {
-		try {
-			InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key);
-		} catch (IOException e) {
-			throw new StreamTaskException("Could not serialize type serializer.", e);
+	@Deprecated
+	public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
+		return getTypeSerializerIn(1, cl);
+	}
+
+	public <T> TypeSerializer<T> getTypeSerializerIn(int index, ClassLoader cl) {
+		Input[] inputs = getInputs(cl);
+		if (index >= inputs.length) {
+			return null;

Review comment:
       It was for backwards compatibility, as this is the equivalent of the code on the master. I think at least some of the tests are failing, that are manually constructing `StreamConfig`, but I'm not sure - I don't know this code very well. I will try to track those failures down and if are easy to fix, will replace it with a `checkState` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r482016598



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -90,16 +91,19 @@
 
 	private final RecordWriterOutput<?>[] streamOutputs;
 
-	private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint;
+	private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput;
 
 	/**
 	 * For iteration, {@link StreamIterationHead} and {@link StreamIterationTail} used for executing
-	 * feedback edges do not contain any operators, in which case, {@code headOperatorWrapper} and
+	 * feedback edges do not contain any operators, in which case, {@code mainOperatorWrapper} and
 	 * {@code tailOperatorWrapper} are null.
 	 */
-	@Nullable private final StreamOperatorWrapper<OUT, OP> headOperatorWrapper;
+	@Nullable private final StreamOperatorWrapper<OUT, OP> mainOperatorWrapper;
+	@Nullable private final StreamOperatorWrapper<?, ?> firstOperatorWrapper;

Review comment:
       Yes, yes, & added more java doc :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski merged pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski merged pull request #13234:
URL: https://github.com/apache/flink/pull/13234


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679234288


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit f1608e8ff1cc00fb6cf162f1320305d5f458d64b (Fri Feb 19 07:26:35 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5847",
       "triggerID" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5890",
       "triggerID" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 705b359ef261cf3c6b1e42c418e7955c90007171 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5890) 
   * e5a866429bf22eb6aeeb26733e5ab1d705780e66 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r477498178



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -208,19 +206,100 @@ public OperatorChain(
 	OperatorChain(
 			List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
 			RecordWriterOutput<?>[] streamOutputs,
-			WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint,
-			StreamOperatorWrapper<OUT, OP> headOperatorWrapper) {
+			WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput,
+			StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) {
 
 		this.streamOutputs = checkNotNull(streamOutputs);
-		this.chainEntryPoint = checkNotNull(chainEntryPoint);
+		this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
 		this.operatorEventDispatcher = null;
 
 		checkState(allOperatorWrappers != null && allOperatorWrappers.size() > 0);
-		this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+		this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
 		this.tailOperatorWrapper = allOperatorWrappers.get(0);
 		this.numOperators = allOperatorWrappers.size();
+		this.chainedSourceOutputs = new ChainedSourceOutputs();
 
-		linkOperatorWrappers(allOperatorWrappers);
+		firstOperatorWrapper = linkOperatorWrappers(allOperatorWrappers);
+	}
+
+	private void createChainOutputs(
+		List<StreamEdge> outEdgesInOrder,
+		RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
+		Map<Integer, StreamConfig> chainedConfigs,
+		StreamTask<OUT, OP> containingTask,
+		Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {
+		for (int i = 0; i < outEdgesInOrder.size(); i++) {
+			StreamEdge outEdge = outEdgesInOrder.get(i);
+
+			RecordWriterOutput<?> streamOutput = createStreamOutput(
+				recordWriterDelegate.getRecordWriter(i),
+				outEdge,
+				chainedConfigs.get(outEdge.getSourceId()),
+				containingTask.getEnvironment());
+
+			this.streamOutputs[i] = streamOutput;
+			streamOutputMap.put(outEdge, streamOutput);
+		}
+	}
+
+	private ChainedSourceOutputs createChainedInputs(
+			StreamTask<OUT, OP> containingTask,
+			StreamConfig.Input[] configuredInputs,
+			Map<Integer, StreamConfig> chainedConfigs,
+			ClassLoader userCodeClassloader,
+			List<StreamOperatorWrapper<?, ?>> allOpWrappers) {
+		if (Arrays.stream(configuredInputs).noneMatch(input -> input instanceof SourceInput)) {
+			return new ChainedSourceOutputs();
+		}
+		checkState(
+			mainOperatorWrapper.getStreamOperator() instanceof MultipleInputStreamOperator,

Review comment:
       Isn't this method called for any operator?
   The `if` check above will not return from function if there are `NetworkInput`s configured.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479120389



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -628,4 +621,61 @@ public String toString() {
 
 		return builder.toString();
 	}
+
+	/**
+	 * Interface representing chained inputs.
+	 */
+	public interface InputConfig extends Serializable {
+	}
+
+	/**
+	 * A representation of a Network {@link InputConfig}.
+	 */
+	public static class NetworkInputConfig implements InputConfig {
+		private final TypeSerializer<?> typeSerializer;
+		private int inputGateIndex;
+
+		public NetworkInputConfig(TypeSerializer<?> typeSerializer, int inputGateIndex) {
+			this.typeSerializer = typeSerializer;
+			this.inputGateIndex = inputGateIndex;
+		}
+
+		public TypeSerializer<?> getTypeSerializer() {
+			return typeSerializer;
+		}
+
+		public int getInputGateIndex() {
+			return inputGateIndex;
+		}
+	}
+
+	/**
+	 * A serialized representation of an input.
+	 */
+	public static class SourceInputConfig implements InputConfig {
+		private final StreamEdge inputEdge;
+
+		public SourceInputConfig(StreamEdge inputEdge) {
+			this.inputEdge = inputEdge;
+		}
+
+		public StreamEdge getInputEdge() {
+			return inputEdge;
+		}
+
+		@Override
+		public String toString() {
+			return inputEdge.toString();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return Objects.equals(obj, inputEdge);

Review comment:
       Are we comparing `InputEdge` vs `SourceInputConfig` here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r477487377



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {

Review comment:
       This class duplicates quite some parts of `ChainingOutput`.
   Why not re-use it by extending/delegating?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -628,4 +622,46 @@ public String toString() {
 
 		return builder.toString();
 	}
+
+	/**
+	 * Interface representing chained inputs.
+	 */
+	public static class Input implements Serializable {

Review comment:
       1. Rename `Input` to `InputConfig` as there is `..api.operators.Input` already? (and subclasses)
   2. Make it `interface` and not `class`? This will allow children to extend other classes and ease testing
   3. Why is this class not parameterized? Its serializer is passed to parameterized classes
   
   

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -208,19 +206,100 @@ public OperatorChain(
 	OperatorChain(
 			List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
 			RecordWriterOutput<?>[] streamOutputs,
-			WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint,
-			StreamOperatorWrapper<OUT, OP> headOperatorWrapper) {
+			WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput,
+			StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) {
 
 		this.streamOutputs = checkNotNull(streamOutputs);
-		this.chainEntryPoint = checkNotNull(chainEntryPoint);
+		this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
 		this.operatorEventDispatcher = null;
 
 		checkState(allOperatorWrappers != null && allOperatorWrappers.size() > 0);
-		this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+		this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
 		this.tailOperatorWrapper = allOperatorWrappers.get(0);
 		this.numOperators = allOperatorWrappers.size();
+		this.chainedSourceOutputs = new ChainedSourceOutputs();
 
-		linkOperatorWrappers(allOperatorWrappers);
+		firstOperatorWrapper = linkOperatorWrappers(allOperatorWrappers);
+	}
+
+	private void createChainOutputs(
+		List<StreamEdge> outEdgesInOrder,
+		RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
+		Map<Integer, StreamConfig> chainedConfigs,
+		StreamTask<OUT, OP> containingTask,
+		Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {

Review comment:
       nit: indentation

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
+	private static final Logger LOG = LoggerFactory.getLogger(MultipleInputChainingOutput.class);
+
+	protected final Input<T> input;
+	protected final Counter numRecordsIn;
+	protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
+	protected final StreamStatusProvider streamStatusProvider;
+	@Nullable protected final OutputTag<T> outputTag;
+
+	public MultipleInputChainingOutput(
+			Input<T> input,
+			OperatorMetricGroup operatorMetricGroup,
+			StreamStatusProvider streamStatusProvider,
+			@Nullable OutputTag<T> outputTag) {
+		this.input = input;
+
+		{
+			Counter tmpNumRecordsIn;
+			try {
+				OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup.getIOMetricGroup();
+				tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
+			} catch (Exception e) {
+				LOG.warn("An exception occurred during the metrics setup.", e);
+				tmpNumRecordsIn = new SimpleCounter();
+			}
+			numRecordsIn = tmpNumRecordsIn;
+		}
+
+		this.streamStatusProvider = streamStatusProvider;
+		this.outputTag = outputTag;
+	}
+
+	@Override
+	public void collect(StreamRecord<T> record) {
+		if (this.outputTag != null) {
+			// we are not responsible for emitting to the main output.
+			return;
+		}
+
+		pushToOperator(record);
+	}
+
+	@Override
+	public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
+		if (this.outputTag == null || !this.outputTag.equals(outputTag)) {

Review comment:
       `Objects.equals`? 
   I guess we should also process record if both tags are null.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -208,19 +206,100 @@ public OperatorChain(
 	OperatorChain(
 			List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
 			RecordWriterOutput<?>[] streamOutputs,
-			WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint,
-			StreamOperatorWrapper<OUT, OP> headOperatorWrapper) {
+			WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput,
+			StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) {
 
 		this.streamOutputs = checkNotNull(streamOutputs);
-		this.chainEntryPoint = checkNotNull(chainEntryPoint);
+		this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
 		this.operatorEventDispatcher = null;
 
 		checkState(allOperatorWrappers != null && allOperatorWrappers.size() > 0);
-		this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+		this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
 		this.tailOperatorWrapper = allOperatorWrappers.get(0);
 		this.numOperators = allOperatorWrappers.size();
+		this.chainedSourceOutputs = new ChainedSourceOutputs();
 
-		linkOperatorWrappers(allOperatorWrappers);
+		firstOperatorWrapper = linkOperatorWrappers(allOperatorWrappers);
+	}
+
+	private void createChainOutputs(
+		List<StreamEdge> outEdgesInOrder,
+		RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
+		Map<Integer, StreamConfig> chainedConfigs,
+		StreamTask<OUT, OP> containingTask,
+		Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {
+		for (int i = 0; i < outEdgesInOrder.size(); i++) {
+			StreamEdge outEdge = outEdgesInOrder.get(i);
+
+			RecordWriterOutput<?> streamOutput = createStreamOutput(
+				recordWriterDelegate.getRecordWriter(i),
+				outEdge,
+				chainedConfigs.get(outEdge.getSourceId()),
+				containingTask.getEnvironment());
+
+			this.streamOutputs[i] = streamOutput;
+			streamOutputMap.put(outEdge, streamOutput);
+		}
+	}
+
+	private ChainedSourceOutputs createChainedInputs(
+			StreamTask<OUT, OP> containingTask,
+			StreamConfig.Input[] configuredInputs,
+			Map<Integer, StreamConfig> chainedConfigs,
+			ClassLoader userCodeClassloader,
+			List<StreamOperatorWrapper<?, ?>> allOpWrappers) {
+		if (Arrays.stream(configuredInputs).noneMatch(input -> input instanceof SourceInput)) {
+			return new ChainedSourceOutputs();
+		}
+		checkState(
+			mainOperatorWrapper.getStreamOperator() instanceof MultipleInputStreamOperator,

Review comment:
       Isn't this method called for any operator?
   The check will not return if there are `NetworkInput`s configured.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -160,81 +159,84 @@ public TimeCharacteristic getTimeCharacteristic() {
 		}
 	}
 
-	public void setTypeSerializersIn(TypeSerializer<?> ...serializers) {
-		config.setInteger(TYPE_SERIALIZERS_IN_COUNT, serializers.length);
-		for (int i = 0; i < serializers.length; i++) {
-			setTypeSerializer(String.format(TYPE_SERIALIZERS_IN_PATTERN, i), serializers[i]);
-		}
-	}
-
 	public void setTypeSerializerOut(TypeSerializer<?> serializer) {
 		setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
 	}
 
+	public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
+		try {
+			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
+		} catch (Exception e) {
+			throw new StreamTaskException("Could not instantiate serializer.", e);
+		}
+	}
+
 	public void setTypeSerializerSideOut(OutputTag<?> outputTag, TypeSerializer<?> serializer) {
 		setTypeSerializer(TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), serializer);
 	}
 
-	@Deprecated
-	public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
-		return getTypeSerializerIn(0, cl);
+	private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) {
+		try {
+			InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key);
+		} catch (IOException e) {
+			throw new StreamTaskException("Could not serialize type serializer.", e);
+		}
 	}
 
-	@Deprecated
-	public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
-		return getTypeSerializerIn(1, cl);
+	public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> outputTag, ClassLoader cl) {
+		Preconditions.checkNotNull(outputTag, "Side output id must not be null.");
+		try {
+			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl);
+		} catch (Exception e) {
+			throw new StreamTaskException("Could not instantiate serializer.", e);
+		}
 	}
 
-	public TypeSerializer<?>[] getTypeSerializersIn(ClassLoader cl) {
-		int typeSerializersCount = config.getInteger(TYPE_SERIALIZERS_IN_COUNT, -1);
-		checkState(
-			typeSerializersCount >= 0,
-			"Missing value for %s in the config? [%d]",
-			TYPE_SERIALIZERS_IN_COUNT,
-			typeSerializersCount);
-		TypeSerializer<?>[] typeSerializers = new TypeSerializer<?>[typeSerializersCount];
-		for (int i = 0; i < typeSerializers.length; i++) {
-			typeSerializers[i] = getTypeSerializerIn(i, cl);
+	public void setTypeSerializersIn(TypeSerializer<?> ...serializers) {
+		Input[] inputs = new Input[serializers.length];
+		for (int i = 0; i < serializers.length; i++) {
+			inputs[i] = new NetworkInput(serializers[i], i);
 		}
-		return typeSerializers;
+		setInputs(inputs);
 	}
 
-	public <T> TypeSerializer<T> getTypeSerializerIn(int index, ClassLoader cl) {
+	public void setInputs(Input ...inputs) {
 		try {
-			return InstantiationUtil.readObjectFromConfig(
-				this.config,
-				String.format(TYPE_SERIALIZERS_IN_PATTERN, index),
-				cl);
-		} catch (Exception e) {
-			throw new StreamTaskException(
-				String.format("Could not instantiate serializer for [%d] input.", index),
-				e);
+			InstantiationUtil.writeObjectToConfig(inputs, this.config, INPUTS);
+		} catch (IOException e) {
+			throw new StreamTaskException("Could not serialize inputs.", e);
 		}
 	}
 
-	public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
+	public Input[] getInputs(ClassLoader cl) {
 		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
+			Input[] inputs = InstantiationUtil.readObjectFromConfig(this.config, INPUTS, cl);
+			if (inputs == null) {
+				return new Input[0];
+			}
+			return inputs;
 		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate serializer.", e);
+			throw new StreamTaskException("Could not deserialize inputs", e);
 		}
 	}
 
-	public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> outputTag, ClassLoader cl) {
-		Preconditions.checkNotNull(outputTag, "Side output id must not be null.");
-		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl);
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate serializer.", e);
-		}
+	@Deprecated
+	public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
+		return getTypeSerializerIn(0, cl);
 	}
 
-	private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) {
-		try {
-			InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key);
-		} catch (IOException e) {
-			throw new StreamTaskException("Could not serialize type serializer.", e);
+	@Deprecated
+	public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
+		return getTypeSerializerIn(1, cl);
+	}
+
+	public <T> TypeSerializer<T> getTypeSerializerIn(int index, ClassLoader cl) {
+		Input[] inputs = getInputs(cl);
+		if (index >= inputs.length) {
+			return null;

Review comment:
       Why can this happen?
   Should we return `Optional` or mark with `@Nullable`?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -160,81 +159,84 @@ public TimeCharacteristic getTimeCharacteristic() {
 		}
 	}
 
-	public void setTypeSerializersIn(TypeSerializer<?> ...serializers) {
-		config.setInteger(TYPE_SERIALIZERS_IN_COUNT, serializers.length);
-		for (int i = 0; i < serializers.length; i++) {
-			setTypeSerializer(String.format(TYPE_SERIALIZERS_IN_PATTERN, i), serializers[i]);
-		}
-	}
-
 	public void setTypeSerializerOut(TypeSerializer<?> serializer) {
 		setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
 	}
 
+	public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
+		try {
+			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
+		} catch (Exception e) {
+			throw new StreamTaskException("Could not instantiate serializer.", e);
+		}
+	}
+
 	public void setTypeSerializerSideOut(OutputTag<?> outputTag, TypeSerializer<?> serializer) {
 		setTypeSerializer(TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), serializer);
 	}
 
-	@Deprecated
-	public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
-		return getTypeSerializerIn(0, cl);
+	private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) {
+		try {
+			InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key);
+		} catch (IOException e) {
+			throw new StreamTaskException("Could not serialize type serializer.", e);
+		}
 	}
 
-	@Deprecated
-	public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
-		return getTypeSerializerIn(1, cl);
+	public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> outputTag, ClassLoader cl) {
+		Preconditions.checkNotNull(outputTag, "Side output id must not be null.");
+		try {
+			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl);
+		} catch (Exception e) {
+			throw new StreamTaskException("Could not instantiate serializer.", e);
+		}
 	}
 
-	public TypeSerializer<?>[] getTypeSerializersIn(ClassLoader cl) {
-		int typeSerializersCount = config.getInteger(TYPE_SERIALIZERS_IN_COUNT, -1);
-		checkState(
-			typeSerializersCount >= 0,
-			"Missing value for %s in the config? [%d]",
-			TYPE_SERIALIZERS_IN_COUNT,
-			typeSerializersCount);
-		TypeSerializer<?>[] typeSerializers = new TypeSerializer<?>[typeSerializersCount];
-		for (int i = 0; i < typeSerializers.length; i++) {
-			typeSerializers[i] = getTypeSerializerIn(i, cl);
+	public void setTypeSerializersIn(TypeSerializer<?> ...serializers) {
+		Input[] inputs = new Input[serializers.length];
+		for (int i = 0; i < serializers.length; i++) {
+			inputs[i] = new NetworkInput(serializers[i], i);
 		}
-		return typeSerializers;
+		setInputs(inputs);
 	}
 
-	public <T> TypeSerializer<T> getTypeSerializerIn(int index, ClassLoader cl) {
+	public void setInputs(Input ...inputs) {
 		try {
-			return InstantiationUtil.readObjectFromConfig(
-				this.config,
-				String.format(TYPE_SERIALIZERS_IN_PATTERN, index),
-				cl);
-		} catch (Exception e) {
-			throw new StreamTaskException(
-				String.format("Could not instantiate serializer for [%d] input.", index),
-				e);
+			InstantiationUtil.writeObjectToConfig(inputs, this.config, INPUTS);
+		} catch (IOException e) {
+			throw new StreamTaskException("Could not serialize inputs.", e);
 		}
 	}
 
-	public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
+	public Input[] getInputs(ClassLoader cl) {
 		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
+			Input[] inputs = InstantiationUtil.readObjectFromConfig(this.config, INPUTS, cl);
+			if (inputs == null) {
+				return new Input[0];
+			}
+			return inputs;
 		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate serializer.", e);
+			throw new StreamTaskException("Could not deserialize inputs", e);
 		}
 	}
 
-	public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> outputTag, ClassLoader cl) {
-		Preconditions.checkNotNull(outputTag, "Side output id must not be null.");
-		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl);
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate serializer.", e);
-		}
+	@Deprecated
+	public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
+		return getTypeSerializerIn(0, cl);
 	}
 
-	private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) {
-		try {
-			InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key);
-		} catch (IOException e) {
-			throw new StreamTaskException("Could not serialize type serializer.", e);
+	@Deprecated
+	public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
+		return getTypeSerializerIn(1, cl);
+	}
+
+	public <T> TypeSerializer<T> getTypeSerializerIn(int index, ClassLoader cl) {
+		Input[] inputs = getInputs(cl);
+		if (index >= inputs.length) {
+			return null;
 		}
+		checkState(inputs[index] instanceof NetworkInput, "Input [%d] was assumed to be network input", index);

Review comment:
       I think `checkState` only recognizes `%s` pattern.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -53,42 +53,48 @@ public void init() throws Exception {
 		StreamConfig configuration = getConfiguration();
 		ClassLoader userClassLoader = getUserCodeClassLoader();
 
-		TypeSerializer<?>[] inputDeserializers = configuration.getTypeSerializersIn(userClassLoader);
+		StreamConfig.Input[] inputs = configuration.getInputs(userClassLoader);
 
-		ArrayList<IndexedInputGate>[] inputLists = new ArrayList[inputDeserializers.length];
-		WatermarkGauge[] watermarkGauges = new WatermarkGauge[inputDeserializers.length];
+		ArrayList<IndexedInputGate>[] inputLists = new ArrayList[
+			(int) Arrays.stream(inputs)
+				.filter(input -> (input instanceof StreamConfig.NetworkInput))
+				.count()];

Review comment:
       Isn't it the same as `configuration.getNumberOfNetworkInputs` below?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -547,302 +675,38 @@ private void linkOperatorWrappers(List<StreamOperatorWrapper<?, ?>> allOperatorW
 		return (tailOperatorWrapper == null) ? null : tailOperatorWrapper.getStreamOperator();
 	}
 
-	// ------------------------------------------------------------------------
-	//  Collectors for output chaining
-	// ------------------------------------------------------------------------
-
 	/**
-	 * An {@link Output} that measures the last emitted watermark with a {@link WatermarkGauge}.
-	 *
-	 * @param <T> The type of the elements that can be emitted.
+	 * Wrapper class to access the chained sources and their's outputs.
 	 */
-	public interface WatermarkGaugeExposingOutput<T> extends Output<T> {
-		Gauge<Long> getWatermarkGauge();
-	}
-
-	static class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
-
-		protected final OneInputStreamOperator<T, ?> operator;
-		protected final Counter numRecordsIn;
-		protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
-
-		protected final StreamStatusProvider streamStatusProvider;
-
-		@Nullable
-		protected final OutputTag<T> outputTag;
-
-		public ChainingOutput(
-				OneInputStreamOperator<T, ?> operator,
-				StreamStatusProvider streamStatusProvider,
-				@Nullable OutputTag<T> outputTag) {
-			this.operator = operator;
-
-			{
-				Counter tmpNumRecordsIn;
-				try {
-					OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup();
-					tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
-				} catch (Exception e) {
-					LOG.warn("An exception occurred during the metrics setup.", e);
-					tmpNumRecordsIn = new SimpleCounter();
-				}
-				numRecordsIn = tmpNumRecordsIn;
-			}
+	public static class ChainedSourceOutputs {
+		private final Map<Integer, WatermarkGaugeExposingOutput<StreamRecord<?>>> chainedSourceOutputs;
+		private final Map<Integer, SourceOperator<?, ?>> sourceOperators;

Review comment:
       I think having a single map on `OperatorChain` level with values holding a pair of Output + Operator would be easier to read and more efficient (by pair I mean a class, named maybe `ChainedSourceOutputs`).

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -208,19 +206,100 @@ public OperatorChain(
 	OperatorChain(
 			List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
 			RecordWriterOutput<?>[] streamOutputs,
-			WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint,
-			StreamOperatorWrapper<OUT, OP> headOperatorWrapper) {
+			WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput,
+			StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) {
 
 		this.streamOutputs = checkNotNull(streamOutputs);
-		this.chainEntryPoint = checkNotNull(chainEntryPoint);
+		this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
 		this.operatorEventDispatcher = null;
 
 		checkState(allOperatorWrappers != null && allOperatorWrappers.size() > 0);
-		this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+		this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
 		this.tailOperatorWrapper = allOperatorWrappers.get(0);
 		this.numOperators = allOperatorWrappers.size();
+		this.chainedSourceOutputs = new ChainedSourceOutputs();
 
-		linkOperatorWrappers(allOperatorWrappers);
+		firstOperatorWrapper = linkOperatorWrappers(allOperatorWrappers);
+	}
+
+	private void createChainOutputs(
+		List<StreamEdge> outEdgesInOrder,
+		RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
+		Map<Integer, StreamConfig> chainedConfigs,
+		StreamTask<OUT, OP> containingTask,
+		Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {
+		for (int i = 0; i < outEdgesInOrder.size(); i++) {
+			StreamEdge outEdge = outEdgesInOrder.get(i);
+
+			RecordWriterOutput<?> streamOutput = createStreamOutput(
+				recordWriterDelegate.getRecordWriter(i),
+				outEdge,
+				chainedConfigs.get(outEdge.getSourceId()),
+				containingTask.getEnvironment());
+
+			this.streamOutputs[i] = streamOutput;
+			streamOutputMap.put(outEdge, streamOutput);
+		}
+	}
+
+	private ChainedSourceOutputs createChainedInputs(

Review comment:
       Something is wrong with return type or name here :) 

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -547,302 +675,38 @@ private void linkOperatorWrappers(List<StreamOperatorWrapper<?, ?>> allOperatorW
 		return (tailOperatorWrapper == null) ? null : tailOperatorWrapper.getStreamOperator();
 	}
 
-	// ------------------------------------------------------------------------
-	//  Collectors for output chaining
-	// ------------------------------------------------------------------------
-
 	/**
-	 * An {@link Output} that measures the last emitted watermark with a {@link WatermarkGauge}.
-	 *
-	 * @param <T> The type of the elements that can be emitted.
+	 * Wrapper class to access the chained sources and their's outputs.
 	 */
-	public interface WatermarkGaugeExposingOutput<T> extends Output<T> {
-		Gauge<Long> getWatermarkGauge();
-	}
-
-	static class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
-
-		protected final OneInputStreamOperator<T, ?> operator;
-		protected final Counter numRecordsIn;
-		protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
-
-		protected final StreamStatusProvider streamStatusProvider;
-
-		@Nullable
-		protected final OutputTag<T> outputTag;
-
-		public ChainingOutput(
-				OneInputStreamOperator<T, ?> operator,
-				StreamStatusProvider streamStatusProvider,
-				@Nullable OutputTag<T> outputTag) {
-			this.operator = operator;
-
-			{
-				Counter tmpNumRecordsIn;
-				try {
-					OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup();
-					tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
-				} catch (Exception e) {
-					LOG.warn("An exception occurred during the metrics setup.", e);
-					tmpNumRecordsIn = new SimpleCounter();
-				}
-				numRecordsIn = tmpNumRecordsIn;
-			}
+	public static class ChainedSourceOutputs {
+		private final Map<Integer, WatermarkGaugeExposingOutput<StreamRecord<?>>> chainedSourceOutputs;
+		private final Map<Integer, SourceOperator<?, ?>> sourceOperators;

Review comment:
       Map key is the index of input in `StreamConfig`, right? This seems a bit fragile and not obvious.
   How about keying by `Input` instances? I see we have them both on put and get.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -90,16 +91,19 @@
 
 	private final RecordWriterOutput<?>[] streamOutputs;
 
-	private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint;
+	private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput;
 
 	/**
 	 * For iteration, {@link StreamIterationHead} and {@link StreamIterationTail} used for executing
-	 * feedback edges do not contain any operators, in which case, {@code headOperatorWrapper} and
+	 * feedback edges do not contain any operators, in which case, {@code mainOperatorWrapper} and
 	 * {@code tailOperatorWrapper} are null.
 	 */
-	@Nullable private final StreamOperatorWrapper<OUT, OP> headOperatorWrapper;
+	@Nullable private final StreamOperatorWrapper<OUT, OP> mainOperatorWrapper;
+	@Nullable private final StreamOperatorWrapper<?, ?> firstOperatorWrapper;

Review comment:
       Why do we need `first`? Isn't it the same as `main`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r478502625



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {

Review comment:
       In an easy way, it jus duplicates two trivial `collect` methods (3 lines of code each). For the other methods, it's only partially duplicating them, and avoiding code duplication there would introduce even more abstract methods and more mangled code.
   
   I think complicating the hierarchy to avoid this duplication would do more harm than good. Especially that the `ChainingOutput` hasn't changed since 2017.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5847",
       "triggerID" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5890",
       "triggerID" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5892",
       "triggerID" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2382ecb7dcb679dddbc39f44717ed1c4c7c061cf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5935",
       "triggerID" : "2382ecb7dcb679dddbc39f44717ed1c4c7c061cf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36d937d37f057632e795e7a7f4c477641bfb6c1e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6100",
       "triggerID" : "36d937d37f057632e795e7a7f4c477641bfb6c1e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f1608e8ff1cc00fb6cf162f1320305d5f458d64b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f1608e8ff1cc00fb6cf162f1320305d5f458d64b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e5a866429bf22eb6aeeb26733e5ab1d705780e66 UNKNOWN
   * 36d937d37f057632e795e7a7f4c477641bfb6c1e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6100) 
   * f1608e8ff1cc00fb6cf162f1320305d5f458d64b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5847",
       "triggerID" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5890",
       "triggerID" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5892",
       "triggerID" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2382ecb7dcb679dddbc39f44717ed1c4c7c061cf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5935",
       "triggerID" : "2382ecb7dcb679dddbc39f44717ed1c4c7c061cf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36d937d37f057632e795e7a7f4c477641bfb6c1e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6100",
       "triggerID" : "36d937d37f057632e795e7a7f4c477641bfb6c1e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e5a866429bf22eb6aeeb26733e5ab1d705780e66 UNKNOWN
   * 36d937d37f057632e795e7a7f4c477641bfb6c1e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6100) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r481240840



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
+	private static final Logger LOG = LoggerFactory.getLogger(MultipleInputChainingOutput.class);
+
+	protected final Input<T> input;
+	protected final Counter numRecordsIn;
+	protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
+	protected final StreamStatusProvider streamStatusProvider;
+	@Nullable protected final OutputTag<T> outputTag;
+
+	public MultipleInputChainingOutput(
+			Input<T> input,
+			OperatorMetricGroup operatorMetricGroup,
+			StreamStatusProvider streamStatusProvider,
+			@Nullable OutputTag<T> outputTag) {
+		this.input = input;
+
+		{
+			Counter tmpNumRecordsIn;
+			try {
+				OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup.getIOMetricGroup();
+				tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
+			} catch (Exception e) {
+				LOG.warn("An exception occurred during the metrics setup.", e);
+				tmpNumRecordsIn = new SimpleCounter();
+			}
+			numRecordsIn = tmpNumRecordsIn;
+		}
+
+		this.streamStatusProvider = streamStatusProvider;
+		this.outputTag = outputTag;
+	}
+
+	@Override
+	public void collect(StreamRecord<T> record) {
+		if (this.outputTag != null) {
+			// we are not responsible for emitting to the main output.
+			return;
+		}
+
+		pushToOperator(record);
+	}
+
+	@Override
+	public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
+		if (this.outputTag == null || !this.outputTag.equals(outputTag)) {

Review comment:
       If someone would like to pass `outputTag = null`, there is the other `collect(record)` method for that purpose. In Flink we have convention that everything is `@NonNull` unless marked otherwise (or someone forgot about `@Nulalble` annotation). 
   
   Also note that's a pre-existing problem, assuming we can agree that's a problem.
   
   Also, the null check is covered by `org.apache.flink.util.OutputTag#equals` 😸 .
   
   Regardless, I've extracted `OutputTag#isResponsibleFor` method, not sure how worthwhile this was.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5847",
       "triggerID" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9063c7aff55eee4e446dddff9d74e83ab692f3d4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5847) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-682038680


   Thank you for the review @rkhachatryan, I've updated the PR. Could you take another look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r478469117



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -628,4 +622,46 @@ public String toString() {
 
 		return builder.toString();
 	}
+
+	/**
+	 * Interface representing chained inputs.
+	 */
+	public static class Input implements Serializable {

Review comment:
       1. and 2. 👍 - I guess it's a class by accident.
   
   3. I don't think we can parametrise it with anything beside `<?>` in practise. And secondly `SourceInput` doesn't have any parameter, so parametrising `Input` wouldn't make much sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r478513685



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -547,302 +675,38 @@ private void linkOperatorWrappers(List<StreamOperatorWrapper<?, ?>> allOperatorW
 		return (tailOperatorWrapper == null) ? null : tailOperatorWrapper.getStreamOperator();
 	}
 
-	// ------------------------------------------------------------------------
-	//  Collectors for output chaining
-	// ------------------------------------------------------------------------
-
 	/**
-	 * An {@link Output} that measures the last emitted watermark with a {@link WatermarkGauge}.
-	 *
-	 * @param <T> The type of the elements that can be emitted.
+	 * Wrapper class to access the chained sources and their's outputs.
 	 */
-	public interface WatermarkGaugeExposingOutput<T> extends Output<T> {
-		Gauge<Long> getWatermarkGauge();
-	}
-
-	static class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
-
-		protected final OneInputStreamOperator<T, ?> operator;
-		protected final Counter numRecordsIn;
-		protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
-
-		protected final StreamStatusProvider streamStatusProvider;
-
-		@Nullable
-		protected final OutputTag<T> outputTag;
-
-		public ChainingOutput(
-				OneInputStreamOperator<T, ?> operator,
-				StreamStatusProvider streamStatusProvider,
-				@Nullable OutputTag<T> outputTag) {
-			this.operator = operator;
-
-			{
-				Counter tmpNumRecordsIn;
-				try {
-					OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup();
-					tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
-				} catch (Exception e) {
-					LOG.warn("An exception occurred during the metrics setup.", e);
-					tmpNumRecordsIn = new SimpleCounter();
-				}
-				numRecordsIn = tmpNumRecordsIn;
-			}
+	public static class ChainedSourceOutputs {
+		private final Map<Integer, WatermarkGaugeExposingOutput<StreamRecord<?>>> chainedSourceOutputs;
+		private final Map<Integer, SourceOperator<?, ?>> sourceOperators;

Review comment:
       Good idea with `Input` - I also didn't like the integer. About moving `Map` to an upper level, I don't mind one way or another, so let it be as you prefer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5847",
       "triggerID" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5890",
       "triggerID" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5892",
       "triggerID" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2382ecb7dcb679dddbc39f44717ed1c4c7c061cf",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5935",
       "triggerID" : "2382ecb7dcb679dddbc39f44717ed1c4c7c061cf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e5a866429bf22eb6aeeb26733e5ab1d705780e66 UNKNOWN
   * 1b6142f9dad4b3893629b2ec774c93adb5875e84 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5892) 
   * 2382ecb7dcb679dddbc39f44717ed1c4c7c061cf Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5935) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r478505421



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -208,19 +206,100 @@ public OperatorChain(
 	OperatorChain(
 			List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
 			RecordWriterOutput<?>[] streamOutputs,
-			WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint,
-			StreamOperatorWrapper<OUT, OP> headOperatorWrapper) {
+			WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput,
+			StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) {
 
 		this.streamOutputs = checkNotNull(streamOutputs);
-		this.chainEntryPoint = checkNotNull(chainEntryPoint);
+		this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
 		this.operatorEventDispatcher = null;
 
 		checkState(allOperatorWrappers != null && allOperatorWrappers.size() > 0);
-		this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+		this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
 		this.tailOperatorWrapper = allOperatorWrappers.get(0);
 		this.numOperators = allOperatorWrappers.size();
+		this.chainedSourceOutputs = new ChainedSourceOutputs();
 
-		linkOperatorWrappers(allOperatorWrappers);
+		firstOperatorWrapper = linkOperatorWrappers(allOperatorWrappers);
+	}
+
+	private void createChainOutputs(
+		List<StreamEdge> outEdgesInOrder,
+		RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
+		Map<Integer, StreamConfig> chainedConfigs,
+		StreamTask<OUT, OP> containingTask,
+		Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {
+		for (int i = 0; i < outEdgesInOrder.size(); i++) {
+			StreamEdge outEdge = outEdgesInOrder.get(i);
+
+			RecordWriterOutput<?> streamOutput = createStreamOutput(
+				recordWriterDelegate.getRecordWriter(i),
+				outEdge,
+				chainedConfigs.get(outEdge.getSourceId()),
+				containingTask.getEnvironment());
+
+			this.streamOutputs[i] = streamOutput;
+			streamOutputMap.put(outEdge, streamOutput);
+		}
+	}
+
+	private ChainedSourceOutputs createChainedInputs(

Review comment:
       😄 
   It makes sense. It creates chained inputs, and it's purpose is to return a collection of those chained source's outputs. But I can change the `ChainedSourceOutputs` name to `ChainedSources`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r481246654



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
+	private static final Logger LOG = LoggerFactory.getLogger(MultipleInputChainingOutput.class);
+
+	protected final Input<T> input;
+	protected final Counter numRecordsIn;
+	protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
+	protected final StreamStatusProvider streamStatusProvider;
+	@Nullable protected final OutputTag<T> outputTag;
+
+	public MultipleInputChainingOutput(
+			Input<T> input,
+			OperatorMetricGroup operatorMetricGroup,
+			StreamStatusProvider streamStatusProvider,
+			@Nullable OutputTag<T> outputTag) {
+		this.input = input;
+
+		{
+			Counter tmpNumRecordsIn;
+			try {
+				OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup.getIOMetricGroup();
+				tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
+			} catch (Exception e) {
+				LOG.warn("An exception occurred during the metrics setup.", e);
+				tmpNumRecordsIn = new SimpleCounter();
+			}
+			numRecordsIn = tmpNumRecordsIn;
+		}
+
+		this.streamStatusProvider = streamStatusProvider;
+		this.outputTag = outputTag;
+	}
+
+	@Override
+	public void collect(StreamRecord<T> record) {
+		if (this.outputTag != null) {
+			// we are not responsible for emitting to the main output.
+			return;
+		}
+
+		pushToOperator(record);
+	}
+
+	@Override
+	public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
+		if (this.outputTag == null || !this.outputTag.equals(outputTag)) {

Review comment:
       > In Flink we have convention that everything is @NonNull 
   
   Why there are so many explicit @NonNull and null checks then :) 
   
   > Also, the null check is covered by org.apache.flink.util.OutputTag#equals smile_cat .
   
   It drops the record instead of throwing an exception.
   
   But ok, let's leave it as is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5847",
       "triggerID" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5890",
       "triggerID" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5892",
       "triggerID" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2382ecb7dcb679dddbc39f44717ed1c4c7c061cf",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5935",
       "triggerID" : "2382ecb7dcb679dddbc39f44717ed1c4c7c061cf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e5a866429bf22eb6aeeb26733e5ab1d705780e66 UNKNOWN
   * 2382ecb7dcb679dddbc39f44717ed1c4c7c061cf Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5935) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5847",
       "triggerID" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5890",
       "triggerID" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5892",
       "triggerID" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2382ecb7dcb679dddbc39f44717ed1c4c7c061cf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5935",
       "triggerID" : "2382ecb7dcb679dddbc39f44717ed1c4c7c061cf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36d937d37f057632e795e7a7f4c477641bfb6c1e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6100",
       "triggerID" : "36d937d37f057632e795e7a7f4c477641bfb6c1e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f1608e8ff1cc00fb6cf162f1320305d5f458d64b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6153",
       "triggerID" : "f1608e8ff1cc00fb6cf162f1320305d5f458d64b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e5a866429bf22eb6aeeb26733e5ab1d705780e66 UNKNOWN
   * 36d937d37f057632e795e7a7f4c477641bfb6c1e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6100) 
   * f1608e8ff1cc00fb6cf162f1320305d5f458d64b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6153) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479107582



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {

Review comment:
       I ran diff on them and see the difference in only `setKeyContextElement1` vs `setKeyContextElement` and in `close`. 
   
   I think the root cause is that `Input` duplicates `OneInputStreamOperator`. And the proper solution would be to refactor `SourceOperator`. But that's apparently out of scope.
   
   Instead, we could use an adapter from operator to input and use only `MultipleInputChainingOutput`, like this:
   ```
   // instead of new ChainingOutput()
   currentOperatorOutput = new MultipleInputChainingOutput<>(Input.from(operator), ((OperatorMetricGroup) operator.getMetricGroup()), this, outputTag) {};
   // where Input.from returns some Input delegating to operator
   ```
   (I think the delegation overhead will be eliminated by JVM, but it can be tested).
   
   Just a suggestion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5847",
       "triggerID" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5890",
       "triggerID" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5892",
       "triggerID" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2382ecb7dcb679dddbc39f44717ed1c4c7c061cf",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2382ecb7dcb679dddbc39f44717ed1c4c7c061cf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e5a866429bf22eb6aeeb26733e5ab1d705780e66 UNKNOWN
   * 1b6142f9dad4b3893629b2ec774c93adb5875e84 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5892) 
   * 2382ecb7dcb679dddbc39f44717ed1c4c7c061cf UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5847",
       "triggerID" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5890",
       "triggerID" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 705b359ef261cf3c6b1e42c418e7955c90007171 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5890) 
   * e5a866429bf22eb6aeeb26733e5ab1d705780e66 UNKNOWN
   * 1b6142f9dad4b3893629b2ec774c93adb5875e84 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479061227



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
+	private static final Logger LOG = LoggerFactory.getLogger(MultipleInputChainingOutput.class);
+
+	protected final Input<T> input;
+	protected final Counter numRecordsIn;
+	protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
+	protected final StreamStatusProvider streamStatusProvider;
+	@Nullable protected final OutputTag<T> outputTag;
+
+	public MultipleInputChainingOutput(
+			Input<T> input,
+			OperatorMetricGroup operatorMetricGroup,
+			StreamStatusProvider streamStatusProvider,
+			@Nullable OutputTag<T> outputTag) {
+		this.input = input;
+
+		{
+			Counter tmpNumRecordsIn;
+			try {
+				OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup.getIOMetricGroup();
+				tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
+			} catch (Exception e) {
+				LOG.warn("An exception occurred during the metrics setup.", e);
+				tmpNumRecordsIn = new SimpleCounter();
+			}
+			numRecordsIn = tmpNumRecordsIn;
+		}
+
+		this.streamStatusProvider = streamStatusProvider;
+		this.outputTag = outputTag;
+	}
+
+	@Override
+	public void collect(StreamRecord<T> record) {
+		if (this.outputTag != null) {
+			// we are not responsible for emitting to the main output.
+			return;
+		}
+
+		pushToOperator(record);
+	}
+
+	@Override
+	public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
+		if (this.outputTag == null || !this.outputTag.equals(outputTag)) {

Review comment:
       It's unclear to me what is the contract and the intended behavior.
   If
   > outputTag is @ NonNull (by default)
   
   then we probably should check for it.
   Currently, we'll just drop the record.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r482281944



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -90,16 +91,19 @@
 
 	private final RecordWriterOutput<?>[] streamOutputs;
 
-	private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint;
+	private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput;
 
 	/**
 	 * For iteration, {@link StreamIterationHead} and {@link StreamIterationTail} used for executing
-	 * feedback edges do not contain any operators, in which case, {@code headOperatorWrapper} and
+	 * feedback edges do not contain any operators, in which case, {@code mainOperatorWrapper} and
 	 * {@code tailOperatorWrapper} are null.
 	 */
-	@Nullable private final StreamOperatorWrapper<OUT, OP> headOperatorWrapper;
+	@Nullable private final StreamOperatorWrapper<OUT, OP> mainOperatorWrapper;
+	@Nullable private final StreamOperatorWrapper<?, ?> firstOperatorWrapper;

Review comment:
       Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479113108



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -208,19 +206,100 @@ public OperatorChain(
 	OperatorChain(
 			List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
 			RecordWriterOutput<?>[] streamOutputs,
-			WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint,
-			StreamOperatorWrapper<OUT, OP> headOperatorWrapper) {
+			WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput,
+			StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) {
 
 		this.streamOutputs = checkNotNull(streamOutputs);
-		this.chainEntryPoint = checkNotNull(chainEntryPoint);
+		this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
 		this.operatorEventDispatcher = null;
 
 		checkState(allOperatorWrappers != null && allOperatorWrappers.size() > 0);
-		this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+		this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
 		this.tailOperatorWrapper = allOperatorWrappers.get(0);
 		this.numOperators = allOperatorWrappers.size();
+		this.chainedSourceOutputs = new ChainedSourceOutputs();
 
-		linkOperatorWrappers(allOperatorWrappers);
+		firstOperatorWrapper = linkOperatorWrappers(allOperatorWrappers);
+	}
+
+	private void createChainOutputs(
+		List<StreamEdge> outEdgesInOrder,
+		RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
+		Map<Integer, StreamConfig> chainedConfigs,
+		StreamTask<OUT, OP> containingTask,
+		Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {
+		for (int i = 0; i < outEdgesInOrder.size(); i++) {
+			StreamEdge outEdge = outEdgesInOrder.get(i);
+
+			RecordWriterOutput<?> streamOutput = createStreamOutput(
+				recordWriterDelegate.getRecordWriter(i),
+				outEdge,
+				chainedConfigs.get(outEdge.getSourceId()),
+				containingTask.getEnvironment());
+
+			this.streamOutputs[i] = streamOutput;
+			streamOutputMap.put(outEdge, streamOutput);
+		}
+	}
+
+	private ChainedSourceOutputs createChainedInputs(

Review comment:
       Given the new signature: `Map<SourceInputConfig, ChainedSource> createChainedInputs`,
   nit: `createChainedSources`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5847",
       "triggerID" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5890",
       "triggerID" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5892",
       "triggerID" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2382ecb7dcb679dddbc39f44717ed1c4c7c061cf",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5935",
       "triggerID" : "2382ecb7dcb679dddbc39f44717ed1c4c7c061cf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36d937d37f057632e795e7a7f4c477641bfb6c1e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "36d937d37f057632e795e7a7f4c477641bfb6c1e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e5a866429bf22eb6aeeb26733e5ab1d705780e66 UNKNOWN
   * 2382ecb7dcb679dddbc39f44717ed1c4c7c061cf Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5935) 
   * 36d937d37f057632e795e7a7f4c477641bfb6c1e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r478503638



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -53,42 +53,48 @@ public void init() throws Exception {
 		StreamConfig configuration = getConfiguration();
 		ClassLoader userClassLoader = getUserCodeClassLoader();
 
-		TypeSerializer<?>[] inputDeserializers = configuration.getTypeSerializersIn(userClassLoader);
+		StreamConfig.Input[] inputs = configuration.getInputs(userClassLoader);
 
-		ArrayList<IndexedInputGate>[] inputLists = new ArrayList[inputDeserializers.length];
-		WatermarkGauge[] watermarkGauges = new WatermarkGauge[inputDeserializers.length];
+		ArrayList<IndexedInputGate>[] inputLists = new ArrayList[
+			(int) Arrays.stream(inputs)
+				.filter(input -> (input instanceof StreamConfig.NetworkInput))
+				.count()];

Review comment:
       yes, it should :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13234:
URL: https://github.com/apache/flink/pull/13234#issuecomment-679243226


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5828",
       "triggerID" : "e1c13bbc1899c7c2d7e94d26cdeb47deddf76b50",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5847",
       "triggerID" : "9063c7aff55eee4e446dddff9d74e83ab692f3d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5890",
       "triggerID" : "705b359ef261cf3c6b1e42c418e7955c90007171",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e5a866429bf22eb6aeeb26733e5ab1d705780e66",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5892",
       "triggerID" : "1b6142f9dad4b3893629b2ec774c93adb5875e84",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e5a866429bf22eb6aeeb26733e5ab1d705780e66 UNKNOWN
   * 1b6142f9dad4b3893629b2ec774c93adb5875e84 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5892) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r478496041



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
+	private static final Logger LOG = LoggerFactory.getLogger(MultipleInputChainingOutput.class);
+
+	protected final Input<T> input;
+	protected final Counter numRecordsIn;
+	protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
+	protected final StreamStatusProvider streamStatusProvider;
+	@Nullable protected final OutputTag<T> outputTag;
+
+	public MultipleInputChainingOutput(
+			Input<T> input,
+			OperatorMetricGroup operatorMetricGroup,
+			StreamStatusProvider streamStatusProvider,
+			@Nullable OutputTag<T> outputTag) {
+		this.input = input;
+
+		{
+			Counter tmpNumRecordsIn;
+			try {
+				OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup.getIOMetricGroup();
+				tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
+			} catch (Exception e) {
+				LOG.warn("An exception occurred during the metrics setup.", e);
+				tmpNumRecordsIn = new SimpleCounter();
+			}
+			numRecordsIn = tmpNumRecordsIn;
+		}
+
+		this.streamStatusProvider = streamStatusProvider;
+		this.outputTag = outputTag;
+	}
+
+	@Override
+	public void collect(StreamRecord<T> record) {
+		if (this.outputTag != null) {
+			// we are not responsible for emitting to the main output.
+			return;
+		}
+
+		pushToOperator(record);
+	}
+
+	@Override
+	public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
+		if (this.outputTag == null || !this.outputTag.equals(outputTag)) {

Review comment:
       In that case user should be calling `Output.collect(record)`, as `outputTag` is `@NonNull` (by default).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479150979



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -208,19 +212,97 @@ public OperatorChain(
 	OperatorChain(
 			List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
 			RecordWriterOutput<?>[] streamOutputs,
-			WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint,
-			StreamOperatorWrapper<OUT, OP> headOperatorWrapper) {
+			WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput,
+			StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) {
 
 		this.streamOutputs = checkNotNull(streamOutputs);
-		this.chainEntryPoint = checkNotNull(chainEntryPoint);
+		this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
 		this.operatorEventDispatcher = null;
 
 		checkState(allOperatorWrappers != null && allOperatorWrappers.size() > 0);
-		this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+		this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
 		this.tailOperatorWrapper = allOperatorWrappers.get(0);
 		this.numOperators = allOperatorWrappers.size();
+		this.chainedSources = Collections.emptyMap();
+
+		firstOperatorWrapper = linkOperatorWrappers(allOperatorWrappers);
+	}
+
+	private void createChainOutputs(
+			List<StreamEdge> outEdgesInOrder,
+			RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
+			Map<Integer, StreamConfig> chainedConfigs,
+			StreamTask<OUT, OP> containingTask,
+			Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {
+		for (int i = 0; i < outEdgesInOrder.size(); i++) {
+			StreamEdge outEdge = outEdgesInOrder.get(i);
+
+			RecordWriterOutput<?> streamOutput = createStreamOutput(
+				recordWriterDelegate.getRecordWriter(i),
+				outEdge,
+				chainedConfigs.get(outEdge.getSourceId()),
+				containingTask.getEnvironment());
+
+			this.streamOutputs[i] = streamOutput;
+			streamOutputMap.put(outEdge, streamOutput);
+		}
+	}
+
+	private Map<SourceInputConfig, ChainedSource> createChainedInputs(

Review comment:
       This method as well as `createChainedInputs` is full of compiler warnings too - can you fix them?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org