You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/22 07:19:21 UTC

[GitHub] [flink] TsReaper opened a new pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

TsReaper opened a new pull request #13742:
URL: https://github.com/apache/flink/pull/13742


   ## What is the purpose of the change
   
   As multiple input exec nodes have been introduced, we're going to construct multiple input operators by a new construction algorithm. This PR introduces such algorithm.
   
   Multiple input optimization is currently not use by default and is only used for tests as its operator is not ready. We'll change this once the multiple input operator is ready.
   
   ## Brief change log
   
    - Introduce multi-input operator construction algorithm
   
   ## Verifying this change
   
   This change added tests and can be verified as follows: Run the newly added tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #13742:
URL: https://github.com/apache/flink/pull/13742#discussion_r512062541



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/AbstractInputPriorityConflictResolver.java
##########
@@ -88,41 +76,58 @@
  *
  * <p>This class maintains a topological graph in which an edge pointing from vertex A to vertex B indicates
  * that the results from vertex A need to be read before those from vertex B. A loop in the graph indicates
- * a deadlock, and we resolve such deadlock by inserting a {@link BatchExecExchange} with batch shuffle mode.
+ * a deadlock, and different subclasses of this class resolve the conflict in different ways.
  *
  * <p>For a detailed explanation of the algorithm, see appendix of the
  * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>.
  */
 @Internal
-public class InputPriorityConflictResolver {
+public abstract class AbstractInputPriorityConflictResolver {
 
 	private final List<ExecNode<?, ?>> roots;
+	private final Set<ExecNode<?, ?>> boundaries;
+	private final ExecEdge.DamBehavior safeDamBehavior;
 
-	private TopologyGraph graph;
+	protected TopologyGraph graph;
 
-	public InputPriorityConflictResolver(List<ExecNode<?, ?>> roots) {
+	/**
+	 * Create an {@link AbstractInputPriorityConflictResolver} for the given {@link ExecNode} sub-graph.
+	 *
+	 * @param roots the first layer of nodes on the output side of the sub-graph
+	 * @param boundaries the first layer of nodes on the input side of the sub-graph
+	 * @param safeDamBehavior when checking for conflicts we'll ignore the edges with
+	 *                        {@link ExecEdge.DamBehavior} stricter or equal than this
+	 */
+	public AbstractInputPriorityConflictResolver(
+			List<ExecNode<?, ?>> roots,
+			Set<ExecNode<?, ?>> boundaries,
+			ExecEdge.DamBehavior safeDamBehavior) {
 		Preconditions.checkArgument(
 			roots.stream().allMatch(root -> root instanceof BatchExecNode),
 			"InputPriorityConflictResolver can only be used for batch jobs.");
 		this.roots = roots;
+		this.boundaries = boundaries;
+		this.safeDamBehavior = safeDamBehavior;
 	}
 
-	public void detectAndResolve() {
+	protected void createTopologyGraphAndResolveConflict() {

Review comment:
       createTopologyGraph?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/AbstractInputPriorityConflictResolver.java
##########
@@ -88,41 +76,58 @@
  *
  * <p>This class maintains a topological graph in which an edge pointing from vertex A to vertex B indicates
  * that the results from vertex A need to be read before those from vertex B. A loop in the graph indicates
- * a deadlock, and we resolve such deadlock by inserting a {@link BatchExecExchange} with batch shuffle mode.
+ * a deadlock, and different subclasses of this class resolve the conflict in different ways.
  *
  * <p>For a detailed explanation of the algorithm, see appendix of the
  * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>.
  */
 @Internal
-public class InputPriorityConflictResolver {
+public abstract class AbstractInputPriorityConflictResolver {
 
 	private final List<ExecNode<?, ?>> roots;
+	private final Set<ExecNode<?, ?>> boundaries;
+	private final ExecEdge.DamBehavior safeDamBehavior;
 
-	private TopologyGraph graph;
+	protected TopologyGraph graph;
 
-	public InputPriorityConflictResolver(List<ExecNode<?, ?>> roots) {
+	/**
+	 * Create an {@link AbstractInputPriorityConflictResolver} for the given {@link ExecNode} sub-graph.
+	 *
+	 * @param roots the first layer of nodes on the output side of the sub-graph
+	 * @param boundaries the first layer of nodes on the input side of the sub-graph
+	 * @param safeDamBehavior when checking for conflicts we'll ignore the edges with
+	 *                        {@link ExecEdge.DamBehavior} stricter or equal than this
+	 */
+	public AbstractInputPriorityConflictResolver(
+			List<ExecNode<?, ?>> roots,
+			Set<ExecNode<?, ?>> boundaries,
+			ExecEdge.DamBehavior safeDamBehavior) {
 		Preconditions.checkArgument(
 			roots.stream().allMatch(root -> root instanceof BatchExecNode),
 			"InputPriorityConflictResolver can only be used for batch jobs.");
 		this.roots = roots;
+		this.boundaries = boundaries;
+		this.safeDamBehavior = safeDamBehavior;
 	}
 
-	public void detectAndResolve() {
+	protected void createTopologyGraphAndResolveConflict() {
 		// build an initial topology graph
-		graph = new TopologyGraph(roots);
+		graph = new TopologyGraph(roots, boundaries);
 
 		// check and resolve conflicts about input priorities
 		AbstractExecNodeExactlyOnceVisitor inputPriorityVisitor = new AbstractExecNodeExactlyOnceVisitor() {
 			@Override
 			protected void visitNode(ExecNode<?, ?> node) {
-				visitInputs(node);
-				checkInputPriorities(node);
+				if (!boundaries.contains(node)) {
+					visitInputs(node);
+				}
+				updateTopologyGraphAndResolveConflict(node);
 			}
 		};
 		roots.forEach(n -> n.accept(inputPriorityVisitor));
 	}
 
-	private void checkInputPriorities(ExecNode<?, ?> node) {
+	private void updateTopologyGraphAndResolveConflict(ExecNode<?, ?> node) {

Review comment:
       updateTopologyGraph

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/TopologyGraph.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A data structure storing the topological and input priority information of an {@link ExecNode} graph.
+ */
+@Internal
+class TopologyGraph {
+
+	private final Map<ExecNode<?, ?>, TopologyNode> nodes;
+
+	TopologyGraph(List<ExecNode<?, ?>> roots) {
+		this(roots, Collections.emptySet());
+	}
+
+	TopologyGraph(List<ExecNode<?, ?>> roots, Set<ExecNode<?, ?>> boundaries) {
+		this.nodes = new HashMap<>();
+
+		// we first link all edges in the original exec node graph
+		AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor() {
+			@Override
+			protected void visitNode(ExecNode<?, ?> node) {
+				if (boundaries.contains(node)) {
+					return;
+				}
+				for (ExecNode<?, ?> input : node.getInputNodes()) {
+					link(input, node);
+				}
+				visitInputs(node);
+			}
+		};
+		roots.forEach(n -> n.accept(visitor));
+	}
+
+	/**
+	 * Link an edge from `from` node to `to` node if no loop will occur after adding this edge.
+	 * Returns if this edge is successfully added.
+	 */
+	boolean link(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+		TopologyNode fromNode = getTopologyNode(from);
+		TopologyNode toNode = getTopologyNode(to);
+
+		if (canReach(toNode, fromNode)) {
+			// invalid edge, as `to` is the predecessor of `from`
+			return false;
+		} else {
+			// link `from` and `to`
+			fromNode.outputs.add(toNode);
+			toNode.inputs.add(fromNode);
+			return true;
+		}
+	}
+
+	/**
+	 * Remove the edge from `from` node to `to` node. If there is no edge between them then do nothing.
+	 */
+	void unlink(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+		TopologyNode fromNode = getTopologyNode(from);
+		TopologyNode toNode = getTopologyNode(to);
+
+		fromNode.outputs.remove(toNode);
+		toNode.inputs.remove(fromNode);
+	}
+
+	/**
+	 * Calculate the maximum distance of the currently added nodes from the nodes without inputs.
+	 * The smallest order is 0 (which are exactly the nodes without inputs) and the distances of

Review comment:
       order -> distance

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/AbstractInputPriorityConflictResolver.java
##########
@@ -88,41 +76,58 @@
  *
  * <p>This class maintains a topological graph in which an edge pointing from vertex A to vertex B indicates
  * that the results from vertex A need to be read before those from vertex B. A loop in the graph indicates
- * a deadlock, and we resolve such deadlock by inserting a {@link BatchExecExchange} with batch shuffle mode.
+ * a deadlock, and different subclasses of this class resolve the conflict in different ways.
  *
  * <p>For a detailed explanation of the algorithm, see appendix of the
  * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>.
  */
 @Internal
-public class InputPriorityConflictResolver {
+public abstract class AbstractInputPriorityConflictResolver {

Review comment:
       InputPriorityBasedTopologyGraphGenerator ?  even we can simplified it as `TopologyGraphGenerator`

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecMultipleInputNode.scala
##########
@@ -99,6 +100,14 @@ class BatchExecMultipleInputNode(
     val memoryKB = generator.getManagedMemoryWeight
     ExecNode.setManagedMemoryWeight(multipleInputTransform, memoryKB * 1024)
 
+    if (withSourceChaining) {
+      // set chaining strategy for source chaining
+      multipleInputTransform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES)

Review comment:
       what if we always set the ChainingStrategy as `HEAD_WITH_SOURCES ` ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/InputOrderCalculator.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;

Review comment:
       the package name is not correct

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/InputPriorityConflictResolverWithExchange.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+
+import org.apache.calcite.rel.RelNode;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Subclass of the {@link AbstractInputPriorityConflictResolver}.
+ *
+ * <p>This class resolve conflicts by inserting a {@link BatchExecExchange} into the conflicting input.
+ */
+@Internal
+public class InputPriorityConflictResolverWithExchange extends AbstractInputPriorityConflictResolver {

Review comment:
       InputPriorityConflictResolver

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/AbstractInputPriorityConflictResolver.java
##########
@@ -205,146 +205,5 @@ protected void visitNode(ExecNode<?, ?> node) {
 		return ret;
 	}
 
-	private BatchExecExchange createExchange(ExecNode<?, ?> node, int idx) {
-		RelNode inputRel = (RelNode) node.getInputNodes().get(idx);
-
-		FlinkRelDistribution distribution;
-		ExecEdge.RequiredShuffle requiredShuffle = node.getInputEdges().get(idx).getRequiredShuffle();
-		if (requiredShuffle.getType() == ExecEdge.ShuffleType.HASH) {
-			distribution = FlinkRelDistribution.hash(requiredShuffle.getKeys(), true);
-		} else if (requiredShuffle.getType() == ExecEdge.ShuffleType.BROADCAST) {
-			// should not occur
-			throw new IllegalStateException(
-				"Trying to resolve input priority conflict on broadcast side. This is not expected.");
-		} else if (requiredShuffle.getType() == ExecEdge.ShuffleType.SINGLETON) {
-			distribution = FlinkRelDistribution.SINGLETON();
-		} else {
-			distribution = FlinkRelDistribution.ANY();
-		}
-
-		BatchExecExchange exchange = new BatchExecExchange(
-			inputRel.getCluster(),
-			inputRel.getTraitSet().replace(distribution),
-			inputRel,
-			distribution);
-		exchange.setRequiredShuffleMode(ShuffleMode.BATCH);
-		return exchange;
-	}
-
-	/**
-	 * A data structure storing the topological information of an {@link ExecNode} graph.
-	 */
-	@VisibleForTesting
-	static class TopologyGraph {
-		private final Map<ExecNode<?, ?>, TopologyNode> nodes;
-
-		TopologyGraph(List<ExecNode<?, ?>> roots) {
-			this.nodes = new HashMap<>();
-
-			// we first link all edges in the original exec node graph
-			AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor() {
-				@Override
-				protected void visitNode(ExecNode<?, ?> node) {
-					for (ExecNode<?, ?> input : node.getInputNodes()) {
-						link(input, node);
-					}
-					visitInputs(node);
-				}
-			};
-			roots.forEach(n -> n.accept(visitor));
-		}
-
-		/**
-		 * Link an edge from `from` node to `to` node if no loop will occur after adding this edge.
-		 * Returns if this edge is successfully added.
-		 */
-		boolean link(ExecNode<?, ?> from, ExecNode<?, ?> to) {
-			TopologyNode fromNode = getTopologyNode(from);
-			TopologyNode toNode = getTopologyNode(to);
-
-			if (canReach(toNode, fromNode)) {
-				// invalid edge, as `to` is the predecessor of `from`
-				return false;
-			} else {
-				// link `from` and `to`
-				fromNode.outputs.add(toNode);
-				toNode.inputs.add(fromNode);
-				return true;
-			}
-		}
-
-		/**
-		 * Remove the edge from `from` node to `to` node. If there is no edge between them then do nothing.
-		 */
-		void unlink(ExecNode<?, ?> from, ExecNode<?, ?> to) {
-			TopologyNode fromNode = getTopologyNode(from);
-			TopologyNode toNode = getTopologyNode(to);
-
-			fromNode.outputs.remove(toNode);
-			toNode.inputs.remove(fromNode);
-		}
-
-		@VisibleForTesting
-		boolean canReach(ExecNode<?, ?> from, ExecNode<?, ?> to) {
-			TopologyNode fromNode = getTopologyNode(from);
-			TopologyNode toNode = getTopologyNode(to);
-			return canReach(fromNode, toNode);
-		}
-
-		private boolean canReach(TopologyNode from, TopologyNode to) {
-			Set<TopologyNode> visited = new HashSet<>();
-			visited.add(from);
-			Queue<TopologyNode> queue = new LinkedList<>();
-			queue.offer(from);
-
-			while (!queue.isEmpty()) {
-				TopologyNode node = queue.poll();
-				if (to.equals(node)) {
-					return true;
-				}
-
-				for (TopologyNode next : node.outputs) {
-					if (visited.contains(next)) {
-						continue;
-					}
-					visited.add(next);
-					queue.offer(next);
-				}
-			}
-
-			return false;
-		}
-
-		private TopologyNode getTopologyNode(ExecNode<?, ?> execNode) {
-			// NOTE: We treat different `BatchExecBoundedStreamScan`s with same `DataStream` object as the same
-			if (execNode instanceof BatchExecBoundedStreamScan) {
-				DataStream<?> currentStream =
-					((BatchExecBoundedStreamScan) execNode).boundedStreamTable().dataStream();
-				for (Map.Entry<ExecNode<?, ?>, TopologyNode> entry : nodes.entrySet()) {
-					ExecNode<?, ?> key = entry.getKey();
-					if (key instanceof BatchExecBoundedStreamScan) {
-						DataStream<?> existingStream =
-							((BatchExecBoundedStreamScan) key).boundedStreamTable().dataStream();
-						if (existingStream.equals(currentStream)) {
-							return entry.getValue();
-						}
-					}
-				}
-
-				TopologyNode result = new TopologyNode();
-				nodes.put(execNode, result);
-				return result;
-			} else {
-				return nodes.computeIfAbsent(execNode, k -> new TopologyNode());
-			}
-		}
-	}
-
-	/**
-	 * A node in the {@link TopologyGraph}.
-	 */
-	private static class TopologyNode {
-		private final Set<TopologyNode> inputs = new HashSet<>();
-		private final Set<TopologyNode> outputs = new HashSet<>();
-	}
+	protected abstract void resolveConflict(ExecNode<?, ?> node, int conflictInput);

Review comment:
       resolveInputPriorityConflict




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091",
       "triggerID" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098",
       "triggerID" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8276",
       "triggerID" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1121f5b1525294821397a62519e33f21c01a097a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8287",
       "triggerID" : "1121f5b1525294821397a62519e33f21c01a097a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "006756c81fa5c333b6fdc4bcbfc8820492a18e9b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8356",
       "triggerID" : "006756c81fa5c333b6fdc4bcbfc8820492a18e9b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9d6abe331e7dd4ef610dbc826aee44216ce14894",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8406",
       "triggerID" : "9d6abe331e7dd4ef610dbc826aee44216ce14894",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9d6abe331e7dd4ef610dbc826aee44216ce14894 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8406) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091",
       "triggerID" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098",
       "triggerID" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8276",
       "triggerID" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1121f5b1525294821397a62519e33f21c01a097a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8287",
       "triggerID" : "1121f5b1525294821397a62519e33f21c01a097a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3c50597400946ab21e7dba24673a0b5df30eaf2b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098) 
   * 02e0db125833bf12ab5856b08a82a03fc5454958 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8276) 
   * 1121f5b1525294821397a62519e33f21c01a097a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8287) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091",
       "triggerID" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098",
       "triggerID" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8276",
       "triggerID" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1121f5b1525294821397a62519e33f21c01a097a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8287",
       "triggerID" : "1121f5b1525294821397a62519e33f21c01a097a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "006756c81fa5c333b6fdc4bcbfc8820492a18e9b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8356",
       "triggerID" : "006756c81fa5c333b6fdc4bcbfc8820492a18e9b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9d6abe331e7dd4ef610dbc826aee44216ce14894",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8406",
       "triggerID" : "9d6abe331e7dd4ef610dbc826aee44216ce14894",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 006756c81fa5c333b6fdc4bcbfc8820492a18e9b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8356) 
   * 9d6abe331e7dd4ef610dbc826aee44216ce14894 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8406) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
TsReaper commented on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-717649912


   Azure passed in https://dev.azure.com/tsreaper96/Flink/_build/results?buildId=100&view=results


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091",
       "triggerID" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098",
       "triggerID" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8276",
       "triggerID" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1121f5b1525294821397a62519e33f21c01a097a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8287",
       "triggerID" : "1121f5b1525294821397a62519e33f21c01a097a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "006756c81fa5c333b6fdc4bcbfc8820492a18e9b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8356",
       "triggerID" : "006756c81fa5c333b6fdc4bcbfc8820492a18e9b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 006756c81fa5c333b6fdc4bcbfc8820492a18e9b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8356) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091",
       "triggerID" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098",
       "triggerID" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3c50597400946ab21e7dba24673a0b5df30eaf2b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091",
       "triggerID" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098",
       "triggerID" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8276",
       "triggerID" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1121f5b1525294821397a62519e33f21c01a097a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8287",
       "triggerID" : "1121f5b1525294821397a62519e33f21c01a097a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "006756c81fa5c333b6fdc4bcbfc8820492a18e9b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8356",
       "triggerID" : "006756c81fa5c333b6fdc4bcbfc8820492a18e9b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9d6abe331e7dd4ef610dbc826aee44216ce14894",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9d6abe331e7dd4ef610dbc826aee44216ce14894",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 006756c81fa5c333b6fdc4bcbfc8820492a18e9b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8356) 
   * 9d6abe331e7dd4ef610dbc826aee44216ce14894 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #13742:
URL: https://github.com/apache/flink/pull/13742#discussion_r512431935



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/processor/utils/TopologyGraph.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.processor.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A data structure storing the topological and input priority information of an {@link ExecNode} graph.
+ */
+@Internal
+class TopologyGraph {
+
+	private final Map<ExecNode<?, ?>, TopologyNode> nodes;
+
+	TopologyGraph(List<ExecNode<?, ?>> roots) {
+		this(roots, Collections.emptySet());
+	}
+
+	TopologyGraph(List<ExecNode<?, ?>> roots, Set<ExecNode<?, ?>> boundaries) {
+		this.nodes = new HashMap<>();
+
+		// we first link all edges in the original exec node graph
+		AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor() {
+			@Override
+			protected void visitNode(ExecNode<?, ?> node) {
+				if (boundaries.contains(node)) {
+					return;
+				}
+				for (ExecNode<?, ?> input : node.getInputNodes()) {
+					link(input, node);
+				}
+				visitInputs(node);
+			}
+		};
+		roots.forEach(n -> n.accept(visitor));
+	}
+
+	/**
+	 * Link an edge from `from` node to `to` node if no loop will occur after adding this edge.
+	 * Returns if this edge is successfully added.
+	 */
+	boolean link(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+		TopologyNode fromNode = getTopologyNode(from);
+		TopologyNode toNode = getTopologyNode(to);
+
+		if (canReach(toNode, fromNode)) {
+			// invalid edge, as `to` is the predecessor of `from`
+			return false;
+		} else {
+			// link `from` and `to`
+			fromNode.outputs.add(toNode);
+			toNode.inputs.add(fromNode);
+			return true;
+		}
+	}
+
+	/**
+	 * Remove the edge from `from` node to `to` node. If there is no edge between them then do nothing.
+	 */
+	void unlink(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+		TopologyNode fromNode = getTopologyNode(from);
+		TopologyNode toNode = getTopologyNode(to);
+
+		fromNode.outputs.remove(toNode);
+		toNode.inputs.remove(fromNode);
+	}
+
+	/**
+	 * Calculate the maximum distance of the currently added nodes from the nodes without inputs.
+	 * The smallest distance is 0 (which are exactly the nodes without inputs) and the distances of
+	 * other nodes are the largest distances in their inputs plus 1.
+	 */
+	Map<ExecNode<?, ?>, Integer> calculateDistance() {

Review comment:
       give `distance` a definition ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/processor/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.processor;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.table.planner.plan.processor.utils.InputOrderCalculator;
+import org.apache.flink.table.planner.plan.processor.utils.InputPriorityConflictResolver;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Union;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+	private final boolean isStreaming;
+
+	public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+		this.isStreaming = isStreaming;
+	}
+
+	@Override
+	public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> roots, DAGProcessContext context) {
+		if (!isStreaming) {
+			// As multiple input nodes use function call to deliver records between sub-operators,
+			// we cannot rely on network buffers to buffer records not yet ready to be read,
+			// so only BLOCKING dam behavior is safe here.
+			// If conflict is detected under this stricter constraint,
+			// we add a PIPELINED exchange to mark that its input and output node cannot be merged
+			// into the same multiple input node
+			InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(
+				roots,
+				ExecEdge.DamBehavior.BLOCKING,
+				ShuffleMode.PIPELINED);
+			resolver.detectAndResolve();
+		}
+
+		List<ExecNodeWrapper> rootWrappers = wrapExecNodes(roots);
+		// sort all nodes in topological order, sinks come first and sources come last
+		List<ExecNodeWrapper> orderedWrappers = topologicalSort(rootWrappers);
+		// group nodes into multiple input groups
+		createMultipleInputGroups(orderedWrappers);
+		// apply optimizations to remove unnecessary nodes out of multiple input groups
+		optimizeMultipleInputGroups(orderedWrappers);
+
+		// create the real multiple input nodes
+		return createMultipleInputNodes(rootWrappers);
+	}
+
+	// --------------------------------------------------------------------------------
+	// Wrapping and Sorting
+	// --------------------------------------------------------------------------------
+
+	private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>> rootNodes) {
+		Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new HashMap<>();
+		AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor() {
+			@Override
+			protected void visitNode(ExecNode<?, ?> node) {
+				ExecNodeWrapper wrapper = wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+				for (ExecNode<?, ?> input : node.getInputNodes()) {
+					ExecNodeWrapper inputWrapper = wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+					wrapper.inputs.add(inputWrapper);
+					inputWrapper.outputs.add(wrapper);
+				}
+				visitInputs(node);
+			}
+		};
+		rootNodes.forEach(s -> s.accept(visitor));
+
+		List<ExecNodeWrapper> rootWrappers = new ArrayList<>();
+		for (ExecNode<?, ?> root : rootNodes) {
+			ExecNodeWrapper rootWrapper = wrapperMap.get(root);
+			Preconditions.checkNotNull(rootWrapper, "Root node is not wrapped. This is a bug.");
+			rootWrappers.add(rootWrapper);
+		}
+		return rootWrappers;
+	}
+
+	private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper> rootWrappers) {
+		List<ExecNodeWrapper> result = new ArrayList<>();
+		Queue<ExecNodeWrapper> queue = new LinkedList<>(rootWrappers);
+		Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+		while (!queue.isEmpty()) {
+			ExecNodeWrapper wrapper = queue.poll();
+			result.add(wrapper);
+			for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+				int visitCount = visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+				if (visitCount == inputWrapper.outputs.size()) {
+					queue.offer(inputWrapper);
+				}
+			}
+		}
+
+		return result;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Creating
+	// --------------------------------------------------------------------------------
+
+	private void createMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sinks to sources
+		for (ExecNodeWrapper wrapper : orderedWrappers) {
+			// we skip nodes which cannot be a member of a multiple input node
+			if (!canBeMultipleInputNodeMember(wrapper)) {
+				continue;
+			}
+
+			// we first try to assign this wrapper into the same group with its outputs
+			MultipleInputGroup outputGroup = canBeInSameGroupWithOutputs(wrapper);
+			if (outputGroup != null) {
+				wrapper.addToGroup(outputGroup);
+				continue;
+			}
+
+			// we then try to create a new multiple input group with this node as the root
+			if (canBeRootOfMultipleInputGroup(wrapper)) {
+				wrapper.createGroup();
+			}
+
+			// all our attempts failed, this node will not be in a multiple input node
+		}
+	}
+
+	private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+		if (wrapper.inputs.isEmpty()) {
+			// sources cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof Exchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+
+		return true;
+	}
+
+	/**
+	 * A node can only be assigned into the same multiple input group of its outputs
+	 * if all outputs have a group and are the same.
+	 *
+	 * @return the {@link MultipleInputGroup} of the outputs if all outputs have a
+	 *         group and are the same, null otherwise
+	 */
+	private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper wrapper) {
+		if (wrapper.outputs.isEmpty()) {
+			return null;
+		}
+
+		MultipleInputGroup outputGroup = wrapper.outputs.get(0).group;
+		if (outputGroup == null) {
+			return null;
+		}
+
+		for (ExecNodeWrapper outputWrapper : wrapper.outputs) {
+			if (outputWrapper.group != outputGroup) {
+				return null;
+			}
+		}
+
+		return outputGroup;
+	}
+
+	private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+		// only a node with more than one input can be the root,
+		// as one-input operator chaining are handled by operator chains
+		return wrapper.inputs.size() >= 2;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Optimizing
+	// --------------------------------------------------------------------------------
+
+	private void optimizeMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sources to sinks
+		for (int i = orderedWrappers.size() - 1; i >= 0; i--) {
+			ExecNodeWrapper wrapper = orderedWrappers.get(i);
+			MultipleInputGroup group = wrapper.group;
+			if (group == null) {
+				// we only consider nodes currently in a multiple input group
+				continue;
+			}
+
+			boolean isUnion = wrapper.execNode instanceof Union;
+
+			if (group.members.size() == 1) {
+				Preconditions.checkState(
+					wrapper == group.root,
+					"The only member of a multiple input group is not its root. This is a bug.");
+				// optimization 1. we clean up multiple input groups with only 1 member,
+				// unless one of its input is a FLIP-27 source (for maximizing source chaining),
+				// however unions do not apply to this optimization because they're not real operators
+				if (isUnion || wrapper.inputs.stream().noneMatch(
+						inputWrapper -> isChainableSource(inputWrapper.execNode))) {
+					wrapper.removeFromGroup();
+				}
+				continue;
+			}
+
+			if (!isEntranceOfMultipleInputGroup(wrapper)) {
+				// we're not removing a node from the middle of a multiple input group
+				continue;
+			}
+
+			boolean shouldRemove = false;
+			if (isUnion) {
+				// optimization 2. we do not allow union to be the tail of a multiple input
+				// as we're paying extra function calls for this, unless one of the united
+				// input is a FLIP-27 source
+				shouldRemove = wrapper.inputs.stream().noneMatch(
+					inputWrapper -> isChainableSource(inputWrapper.execNode));
+			} else if (wrapper.inputs.size() == 1) {
+				// optimization 3. for one-input operators we'll remove it unless its input
+				// is an exchange or a FLIP-27 source, this is mainly to avoid the following
+				// pattern:
+				// non-chainable source -> calc --\
+				//                                 join ->
+				// non-chainable source -> calc --/
+				// if we move two calcs into the multiple input group rooted at the join, we're
+				// directly shuffling large amount of records from the source without filtering
+				// by the calc
+				ExecNode<?, ?> input = wrapper.inputs.get(0).execNode;
+				shouldRemove = !(input instanceof Exchange) && !isChainableSource(input);
+			}
+
+			// optimization 4. for singleton operations (for example singleton global agg)
+			// we're not including it into the multiple input node as we have to ensure that
+			// the whole multiple input can only have 1 parallelism.
+			// continuous singleton operations connected by forwarding shuffle will be dealt
+			// together with optimization 3
+			shouldRemove |= wrapper.inputs.stream().anyMatch(inputWrapper ->
+				inputWrapper.execNode instanceof BatchExecExchange &&

Review comment:
       should also consider StreamExecExchange here? BatchExecExchange -> Exchange

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/processor/utils/InputPriorityConflictResolverTest.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.processor.utils;
+
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.TestingBatchExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+/**
+ * Tests for {@link InputPriorityConflictResolver}.
+ */
+public class InputPriorityConflictResolverTest {
+
+	@Test
+	public void testDetectAndResolve() {
+		// P = ExecEdge.DamBehavior.PIPELINED, E = ExecEdge.DamBehavior.END_INPUT
+		// P100 = PIPELINED + priority 100
+		//
+		// 0 --------(P0)----> 1 --(P0)-----------> 7
+		//  \                    \-(P0)-> 2 -(P0)--/
+		//   \-------(P0)----> 3 --(P1)-----------/
+		//    \------(P0)----> 4 --(P10)---------/
+		//     \              /                 /
+		//      \    8 -(P0)-<                 /
+		//       \            \               /
+		//        \--(E0)----> 5 --(P10)-----/
+		// 6 ---------(P100)----------------/
+		TestingBatchExecNode[] nodes = new TestingBatchExecNode[9];
+		for (int i = 0; i < nodes.length; i++) {
+			nodes[i] = new TestingBatchExecNode();
+		}
+		nodes[1].addInput(nodes[0], ExecEdge.builder().priority(0).build());
+		nodes[2].addInput(nodes[1], ExecEdge.builder().priority(0).build());
+		nodes[3].addInput(nodes[0], ExecEdge.builder().priority(0).build());
+		nodes[4].addInput(nodes[8], ExecEdge.builder().priority(0).build());
+		nodes[4].addInput(nodes[0], ExecEdge.builder().priority(0).build());
+		nodes[5].addInput(nodes[8], ExecEdge.builder().priority(0).build());
+		nodes[5].addInput(nodes[0], ExecEdge.builder().damBehavior(ExecEdge.DamBehavior.END_INPUT).priority(0).build());
+		nodes[7].addInput(nodes[1], ExecEdge.builder().priority(0).build());
+		nodes[7].addInput(nodes[2], ExecEdge.builder().priority(0).build());
+		nodes[7].addInput(nodes[3], ExecEdge.builder().priority(1).build());
+		nodes[7].addInput(nodes[4], ExecEdge.builder().priority(10).build());
+		nodes[7].addInput(nodes[5], ExecEdge.builder().priority(10).build());
+		nodes[7].addInput(nodes[6], ExecEdge.builder().priority(100).build());
+
+		InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(
+			Collections.singletonList(nodes[7]),
+			ExecEdge.DamBehavior.END_INPUT,
+			ShuffleMode.BATCH);
+		resolver.detectAndResolve();
+		Assert.assertEquals(nodes[1], nodes[7].getInputNodes().get(0));
+		Assert.assertEquals(nodes[2], nodes[7].getInputNodes().get(1));
+		Assert.assertTrue(nodes[7].getInputNodes().get(2) instanceof BatchExecExchange);

Review comment:
       also check the shuffle mode ?

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecMultipleInputNode.scala
##########
@@ -27,12 +27,12 @@ import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecEdge,
 import org.apache.flink.table.planner.plan.nodes.physical.MultipleInputRel
 import org.apache.flink.table.runtime.operators.multipleinput.{BatchMultipleInputStreamOperatorFactory, TableOperatorWrapperGenerator}
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
-
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
-
 import java.util
 
+import org.apache.flink.streaming.api.operators.ChainingStrategy

Review comment:
       reorder the import

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/processor/utils/TopologyGraph.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.processor.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A data structure storing the topological and input priority information of an {@link ExecNode} graph.
+ */
+@Internal
+class TopologyGraph {
+
+	private final Map<ExecNode<?, ?>, TopologyNode> nodes;
+
+	TopologyGraph(List<ExecNode<?, ?>> roots) {
+		this(roots, Collections.emptySet());
+	}
+
+	TopologyGraph(List<ExecNode<?, ?>> roots, Set<ExecNode<?, ?>> boundaries) {
+		this.nodes = new HashMap<>();
+
+		// we first link all edges in the original exec node graph
+		AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor() {
+			@Override
+			protected void visitNode(ExecNode<?, ?> node) {
+				if (boundaries.contains(node)) {
+					return;
+				}
+				for (ExecNode<?, ?> input : node.getInputNodes()) {
+					link(input, node);
+				}
+				visitInputs(node);
+			}
+		};
+		roots.forEach(n -> n.accept(visitor));
+	}
+
+	/**
+	 * Link an edge from `from` node to `to` node if no loop will occur after adding this edge.
+	 * Returns if this edge is successfully added.
+	 */
+	boolean link(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+		TopologyNode fromNode = getTopologyNode(from);
+		TopologyNode toNode = getTopologyNode(to);
+
+		if (canReach(toNode, fromNode)) {
+			// invalid edge, as `to` is the predecessor of `from`
+			return false;
+		} else {
+			// link `from` and `to`
+			fromNode.outputs.add(toNode);
+			toNode.inputs.add(fromNode);
+			return true;
+		}
+	}
+
+	/**
+	 * Remove the edge from `from` node to `to` node. If there is no edge between them then do nothing.
+	 */
+	void unlink(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+		TopologyNode fromNode = getTopologyNode(from);
+		TopologyNode toNode = getTopologyNode(to);
+
+		fromNode.outputs.remove(toNode);
+		toNode.inputs.remove(fromNode);
+	}
+
+	/**
+	 * Calculate the maximum distance of the currently added nodes from the nodes without inputs.
+	 * The smallest distance is 0 (which are exactly the nodes without inputs) and the distances of
+	 * other nodes are the largest distances in their inputs plus 1.
+	 */
+	Map<ExecNode<?, ?>, Integer> calculateDistance() {
+		Map<ExecNode<?, ?>, Integer> result = new HashMap<>();
+		Map<TopologyNode, Integer> inputsVisitedMap = new HashMap<>();
+
+		Queue<TopologyNode> queue = new LinkedList<>();
+		for (TopologyNode node : nodes.values()) {
+			if (node.inputs.size() == 0) {
+				queue.offer(node);
+			}
+		}
+
+		while (!queue.isEmpty()) {
+			TopologyNode node = queue.poll();
+			int dist = -1;
+			for (TopologyNode input : node.inputs) {
+				dist = Math.max(
+						dist,
+					Preconditions.checkNotNull(
+						result.get(input.execNode),
+						"The distance of an input node is not calculated. This is a bug."));
+			}
+			dist++;
+			result.put(node.execNode, dist);
+
+			for (TopologyNode output : node.outputs) {
+				int inputsVisited = inputsVisitedMap.compute(output, (k, v) -> v == null ? 1 : v + 1);
+				if (inputsVisited == output.inputs.size()) {
+					queue.offer(output);
+				}
+			}
+		}
+
+		return result;
+	}
+
+	@VisibleForTesting
+	boolean canReach(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+		TopologyNode fromNode = getTopologyNode(from);
+		TopologyNode toNode = getTopologyNode(to);
+		return canReach(fromNode, toNode);
+	}
+
+	private boolean canReach(TopologyNode from, TopologyNode to) {
+		Set<TopologyNode> visited = new HashSet<>();
+		visited.add(from);
+		Queue<TopologyNode> queue = new LinkedList<>();
+		queue.offer(from);
+
+		while (!queue.isEmpty()) {
+			TopologyNode node = queue.poll();
+			if (to.equals(node)) {
+				return true;
+			}
+
+			for (TopologyNode next : node.outputs) {
+				if (visited.contains(next)) {
+					continue;
+				}
+				visited.add(next);
+				queue.offer(next);
+			}
+		}
+
+		return false;
+	}
+
+	private TopologyNode getTopologyNode(ExecNode<?, ?> execNode) {

Review comment:
       getOrCreateTopologyNode ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/processor/DeadlockBreakupProcessor.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.processor;

Review comment:
       move it to `org.apache.flink.table.planner.plan.nodes.process`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on a change in pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
TsReaper commented on a change in pull request #13742:
URL: https://github.com/apache/flink/pull/13742#discussion_r511699119



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+	private final boolean isStreaming;
+
+	public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+		this.isStreaming = isStreaming;
+	}
+
+	@Override
+	public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
+		if (!isStreaming) {
+			// As multiple input nodes use function call to deliver records between sub-operators,
+			// we cannot rely on network buffers to buffer records not yet ready to be read,
+			// so only BLOCKING dam behavior is safe here.
+			// If conflict is detected under this stricter constraint,
+			// we add a PIPELINED exchange to mark that its input and output node cannot be merged
+			// into the same multiple input node
+			InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(
+				sinkNodes,
+				Collections.emptySet(),
+				ExecEdge.DamBehavior.BLOCKING,
+				ShuffleMode.PIPELINED);
+			resolver.detectAndResolve();
+		}
+
+		List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+		// sort all nodes in topological order, sinks come first and sources come last
+		List<ExecNodeWrapper> orderedWrappers = topologicalSort(sinkWrappers);
+		// group nodes into multiple input groups
+		createMultipleInputGroups(orderedWrappers);
+		// apply optimizations to remove unnecessary nodes out of multiple input groups
+		optimizeMultipleInputGroups(orderedWrappers);
+
+		// create the real multiple input nodes
+		return createMultipleInputNodes(sinkWrappers);
+	}
+
+	// --------------------------------------------------------------------------------
+	// Wrapping and Sorting
+	// --------------------------------------------------------------------------------
+
+	private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>> sinkNodes) {
+		Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new HashMap<>();
+		AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor() {
+			@Override
+			protected void visitNode(ExecNode<?, ?> node) {
+				ExecNodeWrapper wrapper = wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+				for (ExecNode<?, ?> input : node.getInputNodes()) {
+					ExecNodeWrapper inputWrapper = wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+					wrapper.inputs.add(inputWrapper);
+					inputWrapper.outputs.add(wrapper);
+				}
+				visitInputs(node);
+			}
+		};
+		sinkNodes.forEach(s -> s.accept(visitor));
+
+		List<ExecNodeWrapper> sinkWrappers = new ArrayList<>();
+		for (ExecNode<?, ?> sink : sinkNodes) {
+			ExecNodeWrapper sinkWrapper = wrapperMap.get(sink);
+			Preconditions.checkNotNull(sinkWrapper, "Sink node is not wrapped. This is a bug.");
+			sinkWrappers.add(sinkWrapper);
+		}
+		return sinkWrappers;
+	}
+
+	private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper> sinkWrappers) {
+		List<ExecNodeWrapper> result = new ArrayList<>();
+		Queue<ExecNodeWrapper> queue = new LinkedList<>(sinkWrappers);
+		Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+		while (!queue.isEmpty()) {
+			ExecNodeWrapper wrapper = queue.poll();
+			result.add(wrapper);
+			for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+				int visitCount = visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+				if (visitCount == inputWrapper.outputs.size()) {
+					queue.offer(inputWrapper);
+				}
+			}
+		}
+
+		return result;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Creating
+	// --------------------------------------------------------------------------------
+
+	private void createMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sinks to sources
+		for (ExecNodeWrapper wrapper : orderedWrappers) {
+			// we skip nodes which cannot be a member of a multiple input node
+			if (!canBeMultipleInputNodeMember(wrapper)) {
+				continue;
+			}
+
+			// we first try to assign this wrapper into the same group with its outputs
+			MultipleInputGroup outputGroup = canBeInSameGroupWithOutputs(wrapper);
+			if (outputGroup != null) {
+				wrapper.addToGroup(outputGroup);
+				continue;
+			}
+
+			// we then try to create a new multiple input group with this node as the root
+			if (canBeRootOfMultipleInputGroup(wrapper)) {
+				wrapper.createGroup();
+			}
+
+			// all our attempts failed, this node will not be in a multiple input node
+		}
+	}
+
+	private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+		if (wrapper.inputs.isEmpty()) {
+			// sources cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof BatchExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof StreamExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+
+		return true;
+	}
+
+	/**
+	 * A node can only be assigned into the same multiple input group of its outputs
+	 * if all outputs have a group and are the same.
+	 *
+	 * @return the {@link MultipleInputGroup} of the outputs if all outputs have a
+	 *         group and are the same, null otherwise
+	 */
+	private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper wrapper) {
+		if (wrapper.outputs.isEmpty()) {
+			return null;
+		}
+
+		MultipleInputGroup outputGroup = wrapper.outputs.get(0).group;
+		if (outputGroup == null) {
+			return null;
+		}
+
+		for (ExecNodeWrapper outputWrapper : wrapper.outputs) {
+			if (outputWrapper.group != outputGroup) {
+				return null;
+			}
+		}
+
+		return outputGroup;
+	}
+
+	private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+		// only a node with more than one input can be the root,
+		// as one-input operator chaining are handled by operator chains
+		return wrapper.inputs.size() >= 2;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Optimizing
+	// --------------------------------------------------------------------------------
+
+	private void optimizeMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sources to sinks
+		for (int i = orderedWrappers.size() - 1; i >= 0; i--) {
+			ExecNodeWrapper wrapper = orderedWrappers.get(i);
+			MultipleInputGroup group = wrapper.group;
+			if (group == null) {
+				// we only consider nodes currently in a multiple input group
+				continue;
+			}
+
+			boolean isUnion =
+				wrapper.execNode instanceof BatchExecUnion || wrapper.execNode instanceof StreamExecUnion;
+
+			if (group.members.size() == 1) {
+				Preconditions.checkState(
+					wrapper == group.root,
+					"The only member of a multiple input group is not its root. This is a bug.");
+				// optimization 1. we clean up multiple input groups with only 1 member,
+				// unless one of its input is a FLIP-27 source (for maximizing source chaining),
+				// however unions do not apply to this optimization because they're not real operators
+				if (isUnion || wrapper.inputs.stream().noneMatch(inputWrapper -> isNewSource(inputWrapper.execNode))) {
+					wrapper.removeFromGroup();
+				}
+				continue;
+			}
+
+			if (!isTailOfMultipleInputGroup(wrapper)) {
+				// we're not removing a node from the middle of a multiple input group
+				continue;
+			}
+
+			boolean shouldRemove = false;
+			if (isUnion) {
+				// optimization 2. we do not allow union to be the tail of a multiple input
+				// as we're paying extra function calls for this, unless one of the united
+				// input is a FLIP-27 source
+				shouldRemove = wrapper.inputs.stream().noneMatch(inputWrapper -> isNewSource(inputWrapper.execNode));
+			} else if (wrapper.inputs.size() == 1) {
+				// optimization 3. for one-input operators we'll remove it unless its input
+				// is an exchange or a FLIP-27 source, this is mainly to avoid the following
+				// pattern:
+				// non-chainable source -> calc --\
+				//                                 join ->
+				// non-chainable source -> calc --/
+				// if we move two calcs into the multiple input group rooted at the join, we're
+				// directly shuffling large amount of records from the source without filtering
+				// by the calc
+				ExecNode<?, ?> input = wrapper.inputs.get(0).execNode;
+				shouldRemove = !(input instanceof BatchExecExchange) &&
+					!(input instanceof StreamExecExchange) &&
+					!isNewSource(input);
+			}
+
+			// optimization 4. for singleton operations (for example singleton global agg)
+			// we're not including it into the multiple input node as we have to ensure that
+			// the whole multiple input can only have 1 parallelism.
+			// continuous singleton operations connected by forwarding shuffle will be dealt
+			// together with optimization 3
+			shouldRemove |= wrapper.inputs.stream().anyMatch(inputWrapper ->
+				inputWrapper.execNode instanceof BatchExecExchange &&
+					((BatchExecExchange) inputWrapper.execNode)
+						.distribution.getType() == RelDistribution.Type.SINGLETON);
+
+			if (shouldRemove) {
+				wrapper.removeFromGroup();
+			}
+		}
+	}
+
+	private boolean isTailOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+		Preconditions.checkNotNull(
+			wrapper.group,
+			"Exec node wrapper does not have a multiple input group. This is a bug.");
+		for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+			if (inputWrapper.group == wrapper.group) {
+				// one of the input is in the same group, so this node is not the tail of the group
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private boolean isNewSource(ExecNode<?, ?> node) {
+		if (node instanceof BatchExecBoundedStreamScan) {
+			BatchExecBoundedStreamScan scan = (BatchExecBoundedStreamScan) node;
+			return scan.boundedStreamTable().dataStream().getTransformation() instanceof SourceTransformation;
+		} else if (node instanceof StreamExecDataStreamScan) {
+			StreamExecDataStreamScan scan = (StreamExecDataStreamScan) node;
+			return scan.dataStreamTable().dataStream().getTransformation() instanceof SourceTransformation;
+		}
+		return false;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Nodes Creating
+	// --------------------------------------------------------------------------------
+
+	private List<ExecNode<?, ?>> createMultipleInputNodes(List<ExecNodeWrapper> sinkWrappers) {
+		List<ExecNode<?, ?>> result = new ArrayList<>();
+		Map<ExecNodeWrapper, ExecNode<?, ?>> visitMap = new HashMap<>();
+		for (ExecNodeWrapper sinkWrapper : sinkWrappers) {
+			result.add(getMultipleInputNode(sinkWrapper, visitMap));
+		}
+		return result;
+	}
+
+	private ExecNode<?, ?> getMultipleInputNode(
+			ExecNodeWrapper wrapper,
+			Map<ExecNodeWrapper, ExecNode<?, ?>> visitMap) {
+		if (visitMap.containsKey(wrapper)) {
+			return visitMap.get(wrapper);
+		}
+
+		for (int i = 0; i < wrapper.inputs.size(); i++) {
+			wrapper.execNode.replaceInputNode(i, (ExecNode) getMultipleInputNode(wrapper.inputs.get(i), visitMap));
+		}
+
+		ExecNode<?, ?> ret;
+		if (wrapper.group != null && wrapper == wrapper.group.root) {
+			ret = createMultipleInputNode(wrapper.group, visitMap);
+		} else {
+			ret = wrapper.execNode;
+		}
+		visitMap.put(wrapper, ret);
+		return ret;
+	}
+
+	private ExecNode<?, ?> createMultipleInputNode(
+			MultipleInputGroup group,
+			Map<ExecNodeWrapper, ExecNode<?, ?>> visitMap) {
+		// calculate the inputs of the multiple input node
+		List<Tuple2<ExecNode<?, ?>, ExecEdge>> inputs = new ArrayList<>();
+		for (ExecNodeWrapper member : group.members) {
+			for (int i = 0; i < member.inputs.size(); i++) {
+				ExecNodeWrapper memberInput = member.inputs.get(i);
+				if (group.members.contains(memberInput)) {
+					continue;
+				}
+				Preconditions.checkState(
+					visitMap.containsKey(memberInput),
+					"Input of a multiple input member is not visited. This is a bug.");
+
+				ExecNode<?, ?> inputNode = visitMap.get(memberInput);
+				ExecEdge inputEdge = member.execNode.getInputEdges().get(i);
+				inputs.add(Tuple2.of(inputNode, inputEdge));
+			}
+		}
+
+		if (isStreaming) {
+			return createStreamMultipleInputNode(group, inputs);
+		} else {
+			return createBatchMultipleInputNode(group, inputs);
+		}
+	}
+
+	private StreamExecMultipleInputNode createStreamMultipleInputNode(
+			MultipleInputGroup group,
+			List<Tuple2<ExecNode<?, ?>, ExecEdge>> inputs) {
+		RelNode outputRel = (RelNode) group.root.execNode;
+		RelNode[] inputRels = new RelNode[inputs.size()];
+		for (int i = 0; i < inputs.size(); i++) {
+			inputRels[i] = (RelNode) inputs.get(i).f0;
+		}
+
+		return new StreamExecMultipleInputNode(
+			outputRel.getCluster(),
+			outputRel.getTraitSet(),
+			inputRels,
+			outputRel);
+	}
+
+	private BatchExecMultipleInputNode createBatchMultipleInputNode(
+			MultipleInputGroup group,
+			List<Tuple2<ExecNode<?, ?>, ExecEdge>> inputs) {
+		// first calculate the input orders using InputPriorityConflictResolver
+		Set<ExecNode<?, ?>> inputSet = new HashSet<>();
+		for (Tuple2<ExecNode<?, ?>, ExecEdge> t : inputs) {
+			inputSet.add(t.f0);
+		}
+		InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(
+			Collections.singletonList(group.root.execNode),
+			inputSet,
+			ExecEdge.DamBehavior.BLOCKING,
+			ShuffleMode.PIPELINED);
+		Map<ExecNode<?, ?>, Integer> inputOrderMap = resolver.calculateInputOrder();
+
+		// then create input rels and edges with the input orders
+		RelNode outputRel = (RelNode) group.root.execNode;
+		RelNode[] inputRels = new RelNode[inputs.size()];
+		ExecEdge[] inputEdges = new ExecEdge[inputs.size()];
+		for (int i = 0; i < inputs.size(); i++) {
+			ExecNode<?, ?> inputNode = inputs.get(i).f0;
+			ExecEdge originalInputEdge = inputs.get(i).f1;
+			inputRels[i] = (RelNode) inputNode;
+			inputEdges[i] = ExecEdge.builder()
+				.requiredShuffle(originalInputEdge.getRequiredShuffle())
+				.damBehavior(originalInputEdge.getDamBehavior())
+				.priority(inputOrderMap.get(inputNode))
+				.build();
+		}
+
+		return new BatchExecMultipleInputNode(
+			outputRel.getCluster(),
+			outputRel.getTraitSet(),
+			inputRels,
+			outputRel,
+			inputEdges);
+	}
+
+	// --------------------------------------------------------------------------------
+	// Helper Classes
+	// --------------------------------------------------------------------------------
+
+	private static class ExecNodeWrapper {
+		private final ExecNode<?, ?> execNode;
+		private final List<ExecNodeWrapper> inputs;
+		private final List<ExecNodeWrapper> outputs;
+		private MultipleInputGroup group;
+
+		private ExecNodeWrapper(ExecNode<?, ?> execNode) {
+			this.execNode = execNode;
+			this.inputs = new ArrayList<>();
+			this.outputs = new ArrayList<>();
+			this.group = null;
+		}
+
+		private void createGroup() {
+			this.group = new MultipleInputGroup(this);
+		}
+
+		private void addToGroup(MultipleInputGroup group) {

Review comment:
       We expect this wrapper not to be in any group.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091",
       "triggerID" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ddbf38d29507b98186d2b1a22716e7fb9593f5db Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091) 
   * 3c50597400946ab21e7dba24673a0b5df30eaf2b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on a change in pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
TsReaper commented on a change in pull request #13742:
URL: https://github.com/apache/flink/pull/13742#discussion_r512391340



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecMultipleInputNode.scala
##########
@@ -99,6 +100,14 @@ class BatchExecMultipleInputNode(
     val memoryKB = generator.getManagedMemoryWeight
     ExecNode.setManagedMemoryWeight(multipleInputTransform, memoryKB * 1024)
 
+    if (withSourceChaining) {
+      // set chaining strategy for source chaining
+      multipleInputTransform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES)

Review comment:
       It's OK. I just want to be more precise.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091",
       "triggerID" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098",
       "triggerID" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8276",
       "triggerID" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3c50597400946ab21e7dba24673a0b5df30eaf2b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098) 
   * 02e0db125833bf12ab5856b08a82a03fc5454958 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8276) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe merged pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
godfreyhe merged pull request #13742:
URL: https://github.com/apache/flink/pull/13742


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 37cba38abc15a3ee7d1193644be564c0c75ac4c1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #13742:
URL: https://github.com/apache/flink/pull/13742#discussion_r512657810



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/processors/utils/TopologyGraph.java
##########
@@ -100,6 +100,9 @@ void unlink(ExecNode<?, ?> from, ExecNode<?, ?> to) {
 	 * Calculate the maximum distance of the currently added nodes from the nodes without inputs.
 	 * The smallest distance is 0 (which are exactly the nodes without inputs) and the distances of
 	 * other nodes are the largest distances in their inputs plus 1.
+	 *
+	 * <p>Distance of a node is defined as the number of edges one needs to go through from the
+	 * nodes without inputs to this node.
 	 */
 	Map<ExecNode<?, ?>, Integer> calculateDistance() {

Review comment:
       rename to calculateMaxDistance




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on a change in pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
TsReaper commented on a change in pull request #13742:
URL: https://github.com/apache/flink/pull/13742#discussion_r511700500



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+	private final boolean isStreaming;
+
+	public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+		this.isStreaming = isStreaming;
+	}
+
+	@Override
+	public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
+		if (!isStreaming) {
+			// As multiple input nodes use function call to deliver records between sub-operators,
+			// we cannot rely on network buffers to buffer records not yet ready to be read,
+			// so only BLOCKING dam behavior is safe here.
+			// If conflict is detected under this stricter constraint,
+			// we add a PIPELINED exchange to mark that its input and output node cannot be merged
+			// into the same multiple input node
+			InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(
+				sinkNodes,
+				Collections.emptySet(),
+				ExecEdge.DamBehavior.BLOCKING,
+				ShuffleMode.PIPELINED);
+			resolver.detectAndResolve();
+		}
+
+		List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+		// sort all nodes in topological order, sinks come first and sources come last
+		List<ExecNodeWrapper> orderedWrappers = topologicalSort(sinkWrappers);
+		// group nodes into multiple input groups
+		createMultipleInputGroups(orderedWrappers);
+		// apply optimizations to remove unnecessary nodes out of multiple input groups
+		optimizeMultipleInputGroups(orderedWrappers);
+
+		// create the real multiple input nodes
+		return createMultipleInputNodes(sinkWrappers);
+	}
+
+	// --------------------------------------------------------------------------------
+	// Wrapping and Sorting
+	// --------------------------------------------------------------------------------
+
+	private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>> sinkNodes) {
+		Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new HashMap<>();
+		AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor() {
+			@Override
+			protected void visitNode(ExecNode<?, ?> node) {
+				ExecNodeWrapper wrapper = wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+				for (ExecNode<?, ?> input : node.getInputNodes()) {
+					ExecNodeWrapper inputWrapper = wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+					wrapper.inputs.add(inputWrapper);
+					inputWrapper.outputs.add(wrapper);
+				}
+				visitInputs(node);
+			}
+		};
+		sinkNodes.forEach(s -> s.accept(visitor));
+
+		List<ExecNodeWrapper> sinkWrappers = new ArrayList<>();
+		for (ExecNode<?, ?> sink : sinkNodes) {
+			ExecNodeWrapper sinkWrapper = wrapperMap.get(sink);
+			Preconditions.checkNotNull(sinkWrapper, "Sink node is not wrapped. This is a bug.");
+			sinkWrappers.add(sinkWrapper);
+		}
+		return sinkWrappers;
+	}
+
+	private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper> sinkWrappers) {
+		List<ExecNodeWrapper> result = new ArrayList<>();
+		Queue<ExecNodeWrapper> queue = new LinkedList<>(sinkWrappers);
+		Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+		while (!queue.isEmpty()) {
+			ExecNodeWrapper wrapper = queue.poll();
+			result.add(wrapper);
+			for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+				int visitCount = visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+				if (visitCount == inputWrapper.outputs.size()) {
+					queue.offer(inputWrapper);
+				}
+			}
+		}
+
+		return result;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Creating
+	// --------------------------------------------------------------------------------
+
+	private void createMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sinks to sources
+		for (ExecNodeWrapper wrapper : orderedWrappers) {
+			// we skip nodes which cannot be a member of a multiple input node
+			if (!canBeMultipleInputNodeMember(wrapper)) {
+				continue;
+			}
+
+			// we first try to assign this wrapper into the same group with its outputs
+			MultipleInputGroup outputGroup = canBeInSameGroupWithOutputs(wrapper);
+			if (outputGroup != null) {
+				wrapper.addToGroup(outputGroup);
+				continue;
+			}
+
+			// we then try to create a new multiple input group with this node as the root
+			if (canBeRootOfMultipleInputGroup(wrapper)) {
+				wrapper.createGroup();
+			}
+
+			// all our attempts failed, this node will not be in a multiple input node
+		}
+	}
+
+	private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+		if (wrapper.inputs.isEmpty()) {
+			// sources cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof BatchExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof StreamExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+
+		return true;
+	}
+
+	/**
+	 * A node can only be assigned into the same multiple input group of its outputs
+	 * if all outputs have a group and are the same.
+	 *
+	 * @return the {@link MultipleInputGroup} of the outputs if all outputs have a
+	 *         group and are the same, null otherwise
+	 */
+	private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper wrapper) {
+		if (wrapper.outputs.isEmpty()) {
+			return null;
+		}
+
+		MultipleInputGroup outputGroup = wrapper.outputs.get(0).group;
+		if (outputGroup == null) {
+			return null;
+		}
+
+		for (ExecNodeWrapper outputWrapper : wrapper.outputs) {
+			if (outputWrapper.group != outputGroup) {
+				return null;
+			}
+		}
+
+		return outputGroup;
+	}
+
+	private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+		// only a node with more than one input can be the root,
+		// as one-input operator chaining are handled by operator chains
+		return wrapper.inputs.size() >= 2;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Optimizing
+	// --------------------------------------------------------------------------------
+
+	private void optimizeMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sources to sinks
+		for (int i = orderedWrappers.size() - 1; i >= 0; i--) {
+			ExecNodeWrapper wrapper = orderedWrappers.get(i);
+			MultipleInputGroup group = wrapper.group;
+			if (group == null) {
+				// we only consider nodes currently in a multiple input group
+				continue;
+			}
+
+			boolean isUnion =
+				wrapper.execNode instanceof BatchExecUnion || wrapper.execNode instanceof StreamExecUnion;
+
+			if (group.members.size() == 1) {
+				Preconditions.checkState(
+					wrapper == group.root,
+					"The only member of a multiple input group is not its root. This is a bug.");
+				// optimization 1. we clean up multiple input groups with only 1 member,
+				// unless one of its input is a FLIP-27 source (for maximizing source chaining),
+				// however unions do not apply to this optimization because they're not real operators
+				if (isUnion || wrapper.inputs.stream().noneMatch(inputWrapper -> isNewSource(inputWrapper.execNode))) {
+					wrapper.removeFromGroup();
+				}
+				continue;
+			}
+
+			if (!isTailOfMultipleInputGroup(wrapper)) {
+				// we're not removing a node from the middle of a multiple input group
+				continue;
+			}
+
+			boolean shouldRemove = false;
+			if (isUnion) {
+				// optimization 2. we do not allow union to be the tail of a multiple input
+				// as we're paying extra function calls for this, unless one of the united
+				// input is a FLIP-27 source
+				shouldRemove = wrapper.inputs.stream().noneMatch(inputWrapper -> isNewSource(inputWrapper.execNode));
+			} else if (wrapper.inputs.size() == 1) {
+				// optimization 3. for one-input operators we'll remove it unless its input
+				// is an exchange or a FLIP-27 source, this is mainly to avoid the following
+				// pattern:
+				// non-chainable source -> calc --\
+				//                                 join ->
+				// non-chainable source -> calc --/
+				// if we move two calcs into the multiple input group rooted at the join, we're
+				// directly shuffling large amount of records from the source without filtering
+				// by the calc
+				ExecNode<?, ?> input = wrapper.inputs.get(0).execNode;
+				shouldRemove = !(input instanceof BatchExecExchange) &&
+					!(input instanceof StreamExecExchange) &&
+					!isNewSource(input);
+			}
+
+			// optimization 4. for singleton operations (for example singleton global agg)
+			// we're not including it into the multiple input node as we have to ensure that
+			// the whole multiple input can only have 1 parallelism.
+			// continuous singleton operations connected by forwarding shuffle will be dealt
+			// together with optimization 3
+			shouldRemove |= wrapper.inputs.stream().anyMatch(inputWrapper ->
+				inputWrapper.execNode instanceof BatchExecExchange &&
+					((BatchExecExchange) inputWrapper.execNode)
+						.distribution.getType() == RelDistribution.Type.SINGLETON);
+
+			if (shouldRemove) {
+				wrapper.removeFromGroup();
+			}
+		}
+	}
+
+	private boolean isTailOfMultipleInputGroup(ExecNodeWrapper wrapper) {

Review comment:
       I actually don't like the idea of `head` and `tail` in the multiple input operator as they're ambiguous. Why don't we use phrases like `input`, `entrance` and `root`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091",
       "triggerID" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098",
       "triggerID" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8276",
       "triggerID" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1121f5b1525294821397a62519e33f21c01a097a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8287",
       "triggerID" : "1121f5b1525294821397a62519e33f21c01a097a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1121f5b1525294821397a62519e33f21c01a097a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8287) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on a change in pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
TsReaper commented on a change in pull request #13742:
URL: https://github.com/apache/flink/pull/13742#discussion_r511700192



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+	private final boolean isStreaming;
+
+	public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+		this.isStreaming = isStreaming;
+	}
+
+	@Override
+	public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
+		if (!isStreaming) {
+			// As multiple input nodes use function call to deliver records between sub-operators,
+			// we cannot rely on network buffers to buffer records not yet ready to be read,
+			// so only BLOCKING dam behavior is safe here.
+			// If conflict is detected under this stricter constraint,
+			// we add a PIPELINED exchange to mark that its input and output node cannot be merged
+			// into the same multiple input node
+			InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(
+				sinkNodes,
+				Collections.emptySet(),
+				ExecEdge.DamBehavior.BLOCKING,
+				ShuffleMode.PIPELINED);
+			resolver.detectAndResolve();
+		}
+
+		List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+		// sort all nodes in topological order, sinks come first and sources come last
+		List<ExecNodeWrapper> orderedWrappers = topologicalSort(sinkWrappers);
+		// group nodes into multiple input groups
+		createMultipleInputGroups(orderedWrappers);
+		// apply optimizations to remove unnecessary nodes out of multiple input groups
+		optimizeMultipleInputGroups(orderedWrappers);
+
+		// create the real multiple input nodes
+		return createMultipleInputNodes(sinkWrappers);
+	}
+
+	// --------------------------------------------------------------------------------
+	// Wrapping and Sorting
+	// --------------------------------------------------------------------------------
+
+	private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>> sinkNodes) {
+		Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new HashMap<>();
+		AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor() {
+			@Override
+			protected void visitNode(ExecNode<?, ?> node) {
+				ExecNodeWrapper wrapper = wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+				for (ExecNode<?, ?> input : node.getInputNodes()) {
+					ExecNodeWrapper inputWrapper = wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+					wrapper.inputs.add(inputWrapper);
+					inputWrapper.outputs.add(wrapper);
+				}
+				visitInputs(node);
+			}
+		};
+		sinkNodes.forEach(s -> s.accept(visitor));
+
+		List<ExecNodeWrapper> sinkWrappers = new ArrayList<>();
+		for (ExecNode<?, ?> sink : sinkNodes) {
+			ExecNodeWrapper sinkWrapper = wrapperMap.get(sink);
+			Preconditions.checkNotNull(sinkWrapper, "Sink node is not wrapped. This is a bug.");
+			sinkWrappers.add(sinkWrapper);
+		}
+		return sinkWrappers;
+	}
+
+	private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper> sinkWrappers) {
+		List<ExecNodeWrapper> result = new ArrayList<>();
+		Queue<ExecNodeWrapper> queue = new LinkedList<>(sinkWrappers);
+		Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+		while (!queue.isEmpty()) {
+			ExecNodeWrapper wrapper = queue.poll();
+			result.add(wrapper);
+			for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+				int visitCount = visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+				if (visitCount == inputWrapper.outputs.size()) {
+					queue.offer(inputWrapper);
+				}
+			}
+		}
+
+		return result;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Creating
+	// --------------------------------------------------------------------------------
+
+	private void createMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sinks to sources
+		for (ExecNodeWrapper wrapper : orderedWrappers) {
+			// we skip nodes which cannot be a member of a multiple input node
+			if (!canBeMultipleInputNodeMember(wrapper)) {
+				continue;
+			}
+
+			// we first try to assign this wrapper into the same group with its outputs
+			MultipleInputGroup outputGroup = canBeInSameGroupWithOutputs(wrapper);
+			if (outputGroup != null) {
+				wrapper.addToGroup(outputGroup);
+				continue;
+			}
+
+			// we then try to create a new multiple input group with this node as the root
+			if (canBeRootOfMultipleInputGroup(wrapper)) {
+				wrapper.createGroup();
+			}
+
+			// all our attempts failed, this node will not be in a multiple input node
+		}
+	}
+
+	private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+		if (wrapper.inputs.isEmpty()) {
+			// sources cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof BatchExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof StreamExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+
+		return true;
+	}
+
+	/**
+	 * A node can only be assigned into the same multiple input group of its outputs
+	 * if all outputs have a group and are the same.
+	 *
+	 * @return the {@link MultipleInputGroup} of the outputs if all outputs have a
+	 *         group and are the same, null otherwise
+	 */
+	private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper wrapper) {
+		if (wrapper.outputs.isEmpty()) {
+			return null;
+		}
+
+		MultipleInputGroup outputGroup = wrapper.outputs.get(0).group;
+		if (outputGroup == null) {
+			return null;
+		}
+
+		for (ExecNodeWrapper outputWrapper : wrapper.outputs) {
+			if (outputWrapper.group != outputGroup) {
+				return null;
+			}
+		}
+
+		return outputGroup;
+	}
+
+	private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+		// only a node with more than one input can be the root,
+		// as one-input operator chaining are handled by operator chains
+		return wrapper.inputs.size() >= 2;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Optimizing
+	// --------------------------------------------------------------------------------
+
+	private void optimizeMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sources to sinks
+		for (int i = orderedWrappers.size() - 1; i >= 0; i--) {
+			ExecNodeWrapper wrapper = orderedWrappers.get(i);
+			MultipleInputGroup group = wrapper.group;
+			if (group == null) {
+				// we only consider nodes currently in a multiple input group
+				continue;
+			}
+
+			boolean isUnion =
+				wrapper.execNode instanceof BatchExecUnion || wrapper.execNode instanceof StreamExecUnion;
+
+			if (group.members.size() == 1) {
+				Preconditions.checkState(
+					wrapper == group.root,
+					"The only member of a multiple input group is not its root. This is a bug.");
+				// optimization 1. we clean up multiple input groups with only 1 member,
+				// unless one of its input is a FLIP-27 source (for maximizing source chaining),
+				// however unions do not apply to this optimization because they're not real operators
+				if (isUnion || wrapper.inputs.stream().noneMatch(inputWrapper -> isNewSource(inputWrapper.execNode))) {
+					wrapper.removeFromGroup();
+				}
+				continue;
+			}
+
+			if (!isTailOfMultipleInputGroup(wrapper)) {
+				// we're not removing a node from the middle of a multiple input group
+				continue;
+			}
+
+			boolean shouldRemove = false;
+			if (isUnion) {
+				// optimization 2. we do not allow union to be the tail of a multiple input
+				// as we're paying extra function calls for this, unless one of the united
+				// input is a FLIP-27 source
+				shouldRemove = wrapper.inputs.stream().noneMatch(inputWrapper -> isNewSource(inputWrapper.execNode));
+			} else if (wrapper.inputs.size() == 1) {
+				// optimization 3. for one-input operators we'll remove it unless its input
+				// is an exchange or a FLIP-27 source, this is mainly to avoid the following
+				// pattern:
+				// non-chainable source -> calc --\
+				//                                 join ->
+				// non-chainable source -> calc --/
+				// if we move two calcs into the multiple input group rooted at the join, we're
+				// directly shuffling large amount of records from the source without filtering
+				// by the calc
+				ExecNode<?, ?> input = wrapper.inputs.get(0).execNode;
+				shouldRemove = !(input instanceof BatchExecExchange) &&
+					!(input instanceof StreamExecExchange) &&
+					!isNewSource(input);
+			}
+
+			// optimization 4. for singleton operations (for example singleton global agg)
+			// we're not including it into the multiple input node as we have to ensure that
+			// the whole multiple input can only have 1 parallelism.
+			// continuous singleton operations connected by forwarding shuffle will be dealt
+			// together with optimization 3
+			shouldRemove |= wrapper.inputs.stream().anyMatch(inputWrapper ->
+				inputWrapper.execNode instanceof BatchExecExchange &&
+					((BatchExecExchange) inputWrapper.execNode)
+						.distribution.getType() == RelDistribution.Type.SINGLETON);
+
+			if (shouldRemove) {
+				wrapper.removeFromGroup();
+			}
+		}
+	}
+
+	private boolean isTailOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+		Preconditions.checkNotNull(
+			wrapper.group,
+			"Exec node wrapper does not have a multiple input group. This is a bug.");
+		for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+			if (inputWrapper.group == wrapper.group) {
+				// one of the input is in the same group, so this node is not the tail of the group
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private boolean isNewSource(ExecNode<?, ?> node) {

Review comment:
       FLIP-146 is not ready when this PR is submitted. I'll rebase the master branch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091",
       "triggerID" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 37cba38abc15a3ee7d1193644be564c0c75ac4c1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084) 
   * ddbf38d29507b98186d2b1a22716e7fb9593f5db Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 37cba38abc15a3ee7d1193644be564c0c75ac4c1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714288929


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 37cba38abc15a3ee7d1193644be564c0c75ac4c1 (Thu Oct 22 07:22:28 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on a change in pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
TsReaper commented on a change in pull request #13742:
URL: https://github.com/apache/flink/pull/13742#discussion_r511699281



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+	private final boolean isStreaming;
+
+	public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+		this.isStreaming = isStreaming;
+	}
+
+	@Override
+	public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
+		if (!isStreaming) {
+			// As multiple input nodes use function call to deliver records between sub-operators,
+			// we cannot rely on network buffers to buffer records not yet ready to be read,
+			// so only BLOCKING dam behavior is safe here.
+			// If conflict is detected under this stricter constraint,
+			// we add a PIPELINED exchange to mark that its input and output node cannot be merged
+			// into the same multiple input node
+			InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(
+				sinkNodes,
+				Collections.emptySet(),
+				ExecEdge.DamBehavior.BLOCKING,
+				ShuffleMode.PIPELINED);
+			resolver.detectAndResolve();
+		}
+
+		List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+		// sort all nodes in topological order, sinks come first and sources come last
+		List<ExecNodeWrapper> orderedWrappers = topologicalSort(sinkWrappers);
+		// group nodes into multiple input groups
+		createMultipleInputGroups(orderedWrappers);
+		// apply optimizations to remove unnecessary nodes out of multiple input groups

Review comment:
       They're best-to-have, not a must.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091",
       "triggerID" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098",
       "triggerID" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8276",
       "triggerID" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1121f5b1525294821397a62519e33f21c01a097a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1121f5b1525294821397a62519e33f21c01a097a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3c50597400946ab21e7dba24673a0b5df30eaf2b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098) 
   * 02e0db125833bf12ab5856b08a82a03fc5454958 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8276) 
   * 1121f5b1525294821397a62519e33f21c01a097a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091",
       "triggerID" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098",
       "triggerID" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ddbf38d29507b98186d2b1a22716e7fb9593f5db Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091) 
   * 3c50597400946ab21e7dba24673a0b5df30eaf2b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091",
       "triggerID" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098",
       "triggerID" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3c50597400946ab21e7dba24673a0b5df30eaf2b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098) 
   * 02e0db125833bf12ab5856b08a82a03fc5454958 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #13742:
URL: https://github.com/apache/flink/pull/13742#discussion_r510718313



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
##########
@@ -26,20 +26,20 @@ import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOper
 import org.apache.flink.table.planner.operations.PlannerQueryOperation
 import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef
 import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
-import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext
+import org.apache.flink.table.planner.plan.nodes.process.{DAGProcessContext, DAGProcessor}
 import org.apache.flink.table.planner.plan.optimize.{BatchCommonSubGraphBasedOptimizer, Optimizer}
-import org.apache.flink.table.planner.plan.reuse.DeadlockBreakupProcessor
+import org.apache.flink.table.planner.plan.reuse.{DeadlockBreakupProcessor, MultipleInputNodeCreationProcessor}
 import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOptUtil}
 import org.apache.flink.table.planner.sinks.{BatchSelectTableSink, SelectTableSinkBase}
 import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil}
-
 import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
 import org.apache.calcite.rel.logical.LogicalTableModify
 import org.apache.calcite.rel.{RelCollationTraitDef, RelNode}
 import org.apache.calcite.sql.SqlExplainLevel
-
 import java.util
 
+import org.apache.flink.table.api.config.OptimizerConfigOptions

Review comment:
       reorder the import

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
##########
@@ -99,4 +99,12 @@
 		key("table.optimizer.join-reorder-enabled")
 			.defaultValue(false)
 			.withDescription("Enables join reorder in optimizer. Default is disabled.");
+
+	@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+	public static final ConfigOption<Boolean> TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED =
+		key("table.optimizer.multiple-input-enabled")
+			.defaultValue(false)
+			.withDescription("Enables creating multiple input nodes to reduce shuffling. " +

Review comment:
       nodes -> operators

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+	private final boolean isStreaming;
+
+	public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+		this.isStreaming = isStreaming;
+	}
+
+	@Override
+	public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
+		if (!isStreaming) {
+			// As multiple input nodes use function call to deliver records between sub-operators,
+			// we cannot rely on network buffers to buffer records not yet ready to be read,
+			// so only BLOCKING dam behavior is safe here.
+			// If conflict is detected under this stricter constraint,
+			// we add a PIPELINED exchange to mark that its input and output node cannot be merged
+			// into the same multiple input node
+			InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(
+				sinkNodes,
+				Collections.emptySet(),
+				ExecEdge.DamBehavior.BLOCKING,
+				ShuffleMode.PIPELINED);
+			resolver.detectAndResolve();
+		}
+
+		List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+		// sort all nodes in topological order, sinks come first and sources come last
+		List<ExecNodeWrapper> orderedWrappers = topologicalSort(sinkWrappers);
+		// group nodes into multiple input groups
+		createMultipleInputGroups(orderedWrappers);
+		// apply optimizations to remove unnecessary nodes out of multiple input groups
+		optimizeMultipleInputGroups(orderedWrappers);
+
+		// create the real multiple input nodes
+		return createMultipleInputNodes(sinkWrappers);
+	}
+
+	// --------------------------------------------------------------------------------
+	// Wrapping and Sorting
+	// --------------------------------------------------------------------------------
+
+	private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>> sinkNodes) {
+		Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new HashMap<>();
+		AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor() {
+			@Override
+			protected void visitNode(ExecNode<?, ?> node) {
+				ExecNodeWrapper wrapper = wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+				for (ExecNode<?, ?> input : node.getInputNodes()) {
+					ExecNodeWrapper inputWrapper = wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+					wrapper.inputs.add(inputWrapper);
+					inputWrapper.outputs.add(wrapper);
+				}
+				visitInputs(node);
+			}
+		};
+		sinkNodes.forEach(s -> s.accept(visitor));
+
+		List<ExecNodeWrapper> sinkWrappers = new ArrayList<>();
+		for (ExecNode<?, ?> sink : sinkNodes) {
+			ExecNodeWrapper sinkWrapper = wrapperMap.get(sink);
+			Preconditions.checkNotNull(sinkWrapper, "Sink node is not wrapped. This is a bug.");
+			sinkWrappers.add(sinkWrapper);
+		}
+		return sinkWrappers;
+	}
+
+	private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper> sinkWrappers) {
+		List<ExecNodeWrapper> result = new ArrayList<>();
+		Queue<ExecNodeWrapper> queue = new LinkedList<>(sinkWrappers);
+		Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+		while (!queue.isEmpty()) {
+			ExecNodeWrapper wrapper = queue.poll();
+			result.add(wrapper);
+			for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+				int visitCount = visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+				if (visitCount == inputWrapper.outputs.size()) {
+					queue.offer(inputWrapper);
+				}
+			}
+		}
+
+		return result;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Creating
+	// --------------------------------------------------------------------------------
+
+	private void createMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sinks to sources
+		for (ExecNodeWrapper wrapper : orderedWrappers) {
+			// we skip nodes which cannot be a member of a multiple input node
+			if (!canBeMultipleInputNodeMember(wrapper)) {
+				continue;
+			}
+
+			// we first try to assign this wrapper into the same group with its outputs
+			MultipleInputGroup outputGroup = canBeInSameGroupWithOutputs(wrapper);
+			if (outputGroup != null) {
+				wrapper.addToGroup(outputGroup);
+				continue;
+			}
+
+			// we then try to create a new multiple input group with this node as the root
+			if (canBeRootOfMultipleInputGroup(wrapper)) {
+				wrapper.createGroup();
+			}
+
+			// all our attempts failed, this node will not be in a multiple input node
+		}
+	}
+
+	private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+		if (wrapper.inputs.isEmpty()) {
+			// sources cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof BatchExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof StreamExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}

Review comment:
       `wrapper.execNode instanceof Exchange`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+	private final boolean isStreaming;
+
+	public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+		this.isStreaming = isStreaming;
+	}
+
+	@Override
+	public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
+		if (!isStreaming) {
+			// As multiple input nodes use function call to deliver records between sub-operators,
+			// we cannot rely on network buffers to buffer records not yet ready to be read,
+			// so only BLOCKING dam behavior is safe here.
+			// If conflict is detected under this stricter constraint,
+			// we add a PIPELINED exchange to mark that its input and output node cannot be merged
+			// into the same multiple input node
+			InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(
+				sinkNodes,
+				Collections.emptySet(),
+				ExecEdge.DamBehavior.BLOCKING,
+				ShuffleMode.PIPELINED);
+			resolver.detectAndResolve();
+		}
+
+		List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+		// sort all nodes in topological order, sinks come first and sources come last
+		List<ExecNodeWrapper> orderedWrappers = topologicalSort(sinkWrappers);
+		// group nodes into multiple input groups
+		createMultipleInputGroups(orderedWrappers);
+		// apply optimizations to remove unnecessary nodes out of multiple input groups
+		optimizeMultipleInputGroups(orderedWrappers);
+
+		// create the real multiple input nodes
+		return createMultipleInputNodes(sinkWrappers);
+	}
+
+	// --------------------------------------------------------------------------------
+	// Wrapping and Sorting
+	// --------------------------------------------------------------------------------
+
+	private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>> sinkNodes) {
+		Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new HashMap<>();
+		AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor() {
+			@Override
+			protected void visitNode(ExecNode<?, ?> node) {
+				ExecNodeWrapper wrapper = wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+				for (ExecNode<?, ?> input : node.getInputNodes()) {
+					ExecNodeWrapper inputWrapper = wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+					wrapper.inputs.add(inputWrapper);
+					inputWrapper.outputs.add(wrapper);
+				}
+				visitInputs(node);
+			}
+		};
+		sinkNodes.forEach(s -> s.accept(visitor));
+
+		List<ExecNodeWrapper> sinkWrappers = new ArrayList<>();
+		for (ExecNode<?, ?> sink : sinkNodes) {
+			ExecNodeWrapper sinkWrapper = wrapperMap.get(sink);
+			Preconditions.checkNotNull(sinkWrapper, "Sink node is not wrapped. This is a bug.");
+			sinkWrappers.add(sinkWrapper);
+		}
+		return sinkWrappers;
+	}
+
+	private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper> sinkWrappers) {
+		List<ExecNodeWrapper> result = new ArrayList<>();
+		Queue<ExecNodeWrapper> queue = new LinkedList<>(sinkWrappers);
+		Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+		while (!queue.isEmpty()) {
+			ExecNodeWrapper wrapper = queue.poll();
+			result.add(wrapper);
+			for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+				int visitCount = visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+				if (visitCount == inputWrapper.outputs.size()) {
+					queue.offer(inputWrapper);
+				}
+			}
+		}
+
+		return result;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Creating
+	// --------------------------------------------------------------------------------
+
+	private void createMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sinks to sources
+		for (ExecNodeWrapper wrapper : orderedWrappers) {
+			// we skip nodes which cannot be a member of a multiple input node
+			if (!canBeMultipleInputNodeMember(wrapper)) {
+				continue;
+			}
+
+			// we first try to assign this wrapper into the same group with its outputs
+			MultipleInputGroup outputGroup = canBeInSameGroupWithOutputs(wrapper);
+			if (outputGroup != null) {
+				wrapper.addToGroup(outputGroup);
+				continue;
+			}
+
+			// we then try to create a new multiple input group with this node as the root
+			if (canBeRootOfMultipleInputGroup(wrapper)) {
+				wrapper.createGroup();
+			}
+
+			// all our attempts failed, this node will not be in a multiple input node
+		}
+	}
+
+	private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+		if (wrapper.inputs.isEmpty()) {
+			// sources cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof BatchExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof StreamExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+
+		return true;
+	}
+
+	/**
+	 * A node can only be assigned into the same multiple input group of its outputs
+	 * if all outputs have a group and are the same.
+	 *
+	 * @return the {@link MultipleInputGroup} of the outputs if all outputs have a
+	 *         group and are the same, null otherwise
+	 */
+	private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper wrapper) {
+		if (wrapper.outputs.isEmpty()) {
+			return null;
+		}
+
+		MultipleInputGroup outputGroup = wrapper.outputs.get(0).group;
+		if (outputGroup == null) {
+			return null;
+		}
+
+		for (ExecNodeWrapper outputWrapper : wrapper.outputs) {
+			if (outputWrapper.group != outputGroup) {
+				return null;
+			}
+		}
+
+		return outputGroup;
+	}
+
+	private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+		// only a node with more than one input can be the root,
+		// as one-input operator chaining are handled by operator chains
+		return wrapper.inputs.size() >= 2;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Optimizing
+	// --------------------------------------------------------------------------------
+
+	private void optimizeMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sources to sinks
+		for (int i = orderedWrappers.size() - 1; i >= 0; i--) {
+			ExecNodeWrapper wrapper = orderedWrappers.get(i);
+			MultipleInputGroup group = wrapper.group;
+			if (group == null) {
+				// we only consider nodes currently in a multiple input group
+				continue;
+			}
+
+			boolean isUnion =
+				wrapper.execNode instanceof BatchExecUnion || wrapper.execNode instanceof StreamExecUnion;
+
+			if (group.members.size() == 1) {
+				Preconditions.checkState(
+					wrapper == group.root,
+					"The only member of a multiple input group is not its root. This is a bug.");
+				// optimization 1. we clean up multiple input groups with only 1 member,
+				// unless one of its input is a FLIP-27 source (for maximizing source chaining),
+				// however unions do not apply to this optimization because they're not real operators
+				if (isUnion || wrapper.inputs.stream().noneMatch(inputWrapper -> isNewSource(inputWrapper.execNode))) {
+					wrapper.removeFromGroup();
+				}
+				continue;
+			}
+
+			if (!isTailOfMultipleInputGroup(wrapper)) {
+				// we're not removing a node from the middle of a multiple input group
+				continue;
+			}
+
+			boolean shouldRemove = false;
+			if (isUnion) {
+				// optimization 2. we do not allow union to be the tail of a multiple input
+				// as we're paying extra function calls for this, unless one of the united
+				// input is a FLIP-27 source
+				shouldRemove = wrapper.inputs.stream().noneMatch(inputWrapper -> isNewSource(inputWrapper.execNode));
+			} else if (wrapper.inputs.size() == 1) {
+				// optimization 3. for one-input operators we'll remove it unless its input
+				// is an exchange or a FLIP-27 source, this is mainly to avoid the following
+				// pattern:
+				// non-chainable source -> calc --\
+				//                                 join ->
+				// non-chainable source -> calc --/
+				// if we move two calcs into the multiple input group rooted at the join, we're
+				// directly shuffling large amount of records from the source without filtering
+				// by the calc
+				ExecNode<?, ?> input = wrapper.inputs.get(0).execNode;
+				shouldRemove = !(input instanceof BatchExecExchange) &&
+					!(input instanceof StreamExecExchange) &&
+					!isNewSource(input);
+			}
+
+			// optimization 4. for singleton operations (for example singleton global agg)
+			// we're not including it into the multiple input node as we have to ensure that
+			// the whole multiple input can only have 1 parallelism.
+			// continuous singleton operations connected by forwarding shuffle will be dealt
+			// together with optimization 3
+			shouldRemove |= wrapper.inputs.stream().anyMatch(inputWrapper ->
+				inputWrapper.execNode instanceof BatchExecExchange &&
+					((BatchExecExchange) inputWrapper.execNode)
+						.distribution.getType() == RelDistribution.Type.SINGLETON);
+
+			if (shouldRemove) {
+				wrapper.removeFromGroup();
+			}
+		}
+	}
+
+	private boolean isTailOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+		Preconditions.checkNotNull(
+			wrapper.group,
+			"Exec node wrapper does not have a multiple input group. This is a bug.");
+		for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+			if (inputWrapper.group == wrapper.group) {
+				// one of the input is in the same group, so this node is not the tail of the group
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private boolean isNewSource(ExecNode<?, ?> node) {
+		if (node instanceof BatchExecBoundedStreamScan) {
+			BatchExecBoundedStreamScan scan = (BatchExecBoundedStreamScan) node;
+			return scan.boundedStreamTable().dataStream().getTransformation() instanceof SourceTransformation;
+		} else if (node instanceof StreamExecDataStreamScan) {
+			StreamExecDataStreamScan scan = (StreamExecDataStreamScan) node;
+			return scan.dataStreamTable().dataStream().getTransformation() instanceof SourceTransformation;
+		}
+		return false;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Nodes Creating
+	// --------------------------------------------------------------------------------
+
+	private List<ExecNode<?, ?>> createMultipleInputNodes(List<ExecNodeWrapper> sinkWrappers) {
+		List<ExecNode<?, ?>> result = new ArrayList<>();
+		Map<ExecNodeWrapper, ExecNode<?, ?>> visitMap = new HashMap<>();
+		for (ExecNodeWrapper sinkWrapper : sinkWrappers) {
+			result.add(getMultipleInputNode(sinkWrapper, visitMap));
+		}
+		return result;
+	}
+
+	private ExecNode<?, ?> getMultipleInputNode(
+			ExecNodeWrapper wrapper,
+			Map<ExecNodeWrapper, ExecNode<?, ?>> visitMap) {
+		if (visitMap.containsKey(wrapper)) {
+			return visitMap.get(wrapper);
+		}
+
+		for (int i = 0; i < wrapper.inputs.size(); i++) {
+			wrapper.execNode.replaceInputNode(i, (ExecNode) getMultipleInputNode(wrapper.inputs.get(i), visitMap));
+		}
+
+		ExecNode<?, ?> ret;
+		if (wrapper.group != null && wrapper == wrapper.group.root) {
+			ret = createMultipleInputNode(wrapper.group, visitMap);
+		} else {
+			ret = wrapper.execNode;
+		}
+		visitMap.put(wrapper, ret);
+		return ret;
+	}
+
+	private ExecNode<?, ?> createMultipleInputNode(
+			MultipleInputGroup group,
+			Map<ExecNodeWrapper, ExecNode<?, ?>> visitMap) {
+		// calculate the inputs of the multiple input node
+		List<Tuple2<ExecNode<?, ?>, ExecEdge>> inputs = new ArrayList<>();
+		for (ExecNodeWrapper member : group.members) {
+			for (int i = 0; i < member.inputs.size(); i++) {
+				ExecNodeWrapper memberInput = member.inputs.get(i);
+				if (group.members.contains(memberInput)) {
+					continue;
+				}
+				Preconditions.checkState(
+					visitMap.containsKey(memberInput),
+					"Input of a multiple input member is not visited. This is a bug.");
+
+				ExecNode<?, ?> inputNode = visitMap.get(memberInput);
+				ExecEdge inputEdge = member.execNode.getInputEdges().get(i);
+				inputs.add(Tuple2.of(inputNode, inputEdge));
+			}
+		}
+
+		if (isStreaming) {
+			return createStreamMultipleInputNode(group, inputs);
+		} else {
+			return createBatchMultipleInputNode(group, inputs);
+		}
+	}
+
+	private StreamExecMultipleInputNode createStreamMultipleInputNode(
+			MultipleInputGroup group,
+			List<Tuple2<ExecNode<?, ?>, ExecEdge>> inputs) {
+		RelNode outputRel = (RelNode) group.root.execNode;
+		RelNode[] inputRels = new RelNode[inputs.size()];
+		for (int i = 0; i < inputs.size(); i++) {
+			inputRels[i] = (RelNode) inputs.get(i).f0;
+		}
+
+		return new StreamExecMultipleInputNode(
+			outputRel.getCluster(),
+			outputRel.getTraitSet(),
+			inputRels,
+			outputRel);
+	}
+
+	private BatchExecMultipleInputNode createBatchMultipleInputNode(
+			MultipleInputGroup group,
+			List<Tuple2<ExecNode<?, ?>, ExecEdge>> inputs) {
+		// first calculate the input orders using InputPriorityConflictResolver
+		Set<ExecNode<?, ?>> inputSet = new HashSet<>();
+		for (Tuple2<ExecNode<?, ?>, ExecEdge> t : inputs) {
+			inputSet.add(t.f0);
+		}
+		InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(
+			Collections.singletonList(group.root.execNode),
+			inputSet,
+			ExecEdge.DamBehavior.BLOCKING,
+			ShuffleMode.PIPELINED);
+		Map<ExecNode<?, ?>, Integer> inputOrderMap = resolver.calculateInputOrder();
+
+		// then create input rels and edges with the input orders
+		RelNode outputRel = (RelNode) group.root.execNode;
+		RelNode[] inputRels = new RelNode[inputs.size()];
+		ExecEdge[] inputEdges = new ExecEdge[inputs.size()];
+		for (int i = 0; i < inputs.size(); i++) {
+			ExecNode<?, ?> inputNode = inputs.get(i).f0;
+			ExecEdge originalInputEdge = inputs.get(i).f1;
+			inputRels[i] = (RelNode) inputNode;
+			inputEdges[i] = ExecEdge.builder()
+				.requiredShuffle(originalInputEdge.getRequiredShuffle())
+				.damBehavior(originalInputEdge.getDamBehavior())
+				.priority(inputOrderMap.get(inputNode))
+				.build();
+		}
+
+		return new BatchExecMultipleInputNode(
+			outputRel.getCluster(),
+			outputRel.getTraitSet(),
+			inputRels,
+			outputRel,
+			inputEdges);
+	}
+
+	// --------------------------------------------------------------------------------
+	// Helper Classes
+	// --------------------------------------------------------------------------------
+
+	private static class ExecNodeWrapper {
+		private final ExecNode<?, ?> execNode;
+		private final List<ExecNodeWrapper> inputs;
+		private final List<ExecNodeWrapper> outputs;
+		private MultipleInputGroup group;
+
+		private ExecNodeWrapper(ExecNode<?, ?> execNode) {
+			this.execNode = execNode;
+			this.inputs = new ArrayList<>();
+			this.outputs = new ArrayList<>();
+			this.group = null;
+		}
+
+		private void createGroup() {
+			this.group = new MultipleInputGroup(this);
+		}
+
+		private void addToGroup(MultipleInputGroup group) {

Review comment:
       rename to `replaceGroup` ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+	private final boolean isStreaming;
+
+	public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+		this.isStreaming = isStreaming;
+	}
+
+	@Override
+	public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
+		if (!isStreaming) {
+			// As multiple input nodes use function call to deliver records between sub-operators,
+			// we cannot rely on network buffers to buffer records not yet ready to be read,
+			// so only BLOCKING dam behavior is safe here.
+			// If conflict is detected under this stricter constraint,
+			// we add a PIPELINED exchange to mark that its input and output node cannot be merged
+			// into the same multiple input node
+			InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(
+				sinkNodes,
+				Collections.emptySet(),
+				ExecEdge.DamBehavior.BLOCKING,
+				ShuffleMode.PIPELINED);
+			resolver.detectAndResolve();
+		}
+
+		List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+		// sort all nodes in topological order, sinks come first and sources come last
+		List<ExecNodeWrapper> orderedWrappers = topologicalSort(sinkWrappers);
+		// group nodes into multiple input groups
+		createMultipleInputGroups(orderedWrappers);
+		// apply optimizations to remove unnecessary nodes out of multiple input groups

Review comment:
       Do these optimizations necessary, or can we delete any optimization, but the result is correct

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/InputPriorityConflictResolver.java
##########
@@ -97,31 +98,79 @@
 public class InputPriorityConflictResolver {
 
 	private final List<ExecNode<?, ?>> roots;
+	private final Set<ExecNode<?, ?>> boundaries;
+	private final ExecEdge.DamBehavior safeDamBehavior;
+	private final ShuffleMode shuffleMode;

Review comment:
       add some comments to explain the fields

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/InputPriorityConflictResolver.java
##########
@@ -97,31 +98,79 @@
 public class InputPriorityConflictResolver {
 
 	private final List<ExecNode<?, ?>> roots;
+	private final Set<ExecNode<?, ?>> boundaries;
+	private final ExecEdge.DamBehavior safeDamBehavior;
+	private final ShuffleMode shuffleMode;
 
 	private TopologyGraph graph;
 
-	public InputPriorityConflictResolver(List<ExecNode<?, ?>> roots) {
+	public InputPriorityConflictResolver(
+			List<ExecNode<?, ?>> roots,
+			Set<ExecNode<?, ?>> boundaries,
+			ExecEdge.DamBehavior safeDamBehavior,
+			ShuffleMode shuffleMode) {
 		Preconditions.checkArgument(
 			roots.stream().allMatch(root -> root instanceof BatchExecNode),
 			"InputPriorityConflictResolver can only be used for batch jobs.");
 		this.roots = roots;
+		this.boundaries = boundaries;
+		this.safeDamBehavior = safeDamBehavior;
+		this.shuffleMode = shuffleMode;
 	}
 
 	public void detectAndResolve() {
 		// build an initial topology graph
-		graph = new TopologyGraph(roots);
+		graph = new TopologyGraph(roots, boundaries);
 
 		// check and resolve conflicts about input priorities
 		AbstractExecNodeExactlyOnceVisitor inputPriorityVisitor = new AbstractExecNodeExactlyOnceVisitor() {
 			@Override
 			protected void visitNode(ExecNode<?, ?> node) {
-				visitInputs(node);
+				if (!boundaries.contains(node)) {
+					visitInputs(node);
+				}
 				checkInputPriorities(node);
 			}
 		};
 		roots.forEach(n -> n.accept(inputPriorityVisitor));
 	}
 
+	public Map<ExecNode<?, ?>, Integer> calculateInputOrder() {
+		// we first calculate the topological order of all nodes in the graph
+		detectAndResolve();
+		// check that no exchange is contained in the multiple input node
+		AbstractExecNodeExactlyOnceVisitor inputPriorityVisitor = new AbstractExecNodeExactlyOnceVisitor() {
+			@Override
+			protected void visitNode(ExecNode<?, ?> node) {
+				if (boundaries.contains(node)) {
+					return;
+				}
+				visitInputs(node);
+				Preconditions.checkState(
+					!(node instanceof BatchExecExchange),
+					"There is exchange in a multiple input node. This is a bug.");
+			}
+		};
+		roots.forEach(n -> n.accept(inputPriorityVisitor));

Review comment:
       please check the number of `roots` should always be 1. If we separate this part of logic into another class, we can give `ExecNode<?, ?> root` instead of `List<ExecNode<?, ?>> roots` as a part of the constructor parameters 

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/InputPriorityConflictResolver.java
##########
@@ -97,31 +98,79 @@
 public class InputPriorityConflictResolver {
 
 	private final List<ExecNode<?, ?>> roots;
+	private final Set<ExecNode<?, ?>> boundaries;
+	private final ExecEdge.DamBehavior safeDamBehavior;
+	private final ShuffleMode shuffleMode;
 
 	private TopologyGraph graph;
 
-	public InputPriorityConflictResolver(List<ExecNode<?, ?>> roots) {
+	public InputPriorityConflictResolver(
+			List<ExecNode<?, ?>> roots,
+			Set<ExecNode<?, ?>> boundaries,
+			ExecEdge.DamBehavior safeDamBehavior,
+			ShuffleMode shuffleMode) {
 		Preconditions.checkArgument(
 			roots.stream().allMatch(root -> root instanceof BatchExecNode),
 			"InputPriorityConflictResolver can only be used for batch jobs.");
 		this.roots = roots;
+		this.boundaries = boundaries;
+		this.safeDamBehavior = safeDamBehavior;
+		this.shuffleMode = shuffleMode;
 	}
 
 	public void detectAndResolve() {
 		// build an initial topology graph
-		graph = new TopologyGraph(roots);
+		graph = new TopologyGraph(roots, boundaries);
 
 		// check and resolve conflicts about input priorities
 		AbstractExecNodeExactlyOnceVisitor inputPriorityVisitor = new AbstractExecNodeExactlyOnceVisitor() {
 			@Override
 			protected void visitNode(ExecNode<?, ?> node) {
-				visitInputs(node);
+				if (!boundaries.contains(node)) {
+					visitInputs(node);
+				}
 				checkInputPriorities(node);
 			}
 		};
 		roots.forEach(n -> n.accept(inputPriorityVisitor));
 	}
 
+	public Map<ExecNode<?, ?>, Integer> calculateInputOrder() {

Review comment:
       We should introduce another class to calculate the input orders, named: InputOrderDerivation ? maybe we also need a Base class of `InputOrderDerivation` and `InputPriorityConflictResolver`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+	private final boolean isStreaming;
+
+	public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+		this.isStreaming = isStreaming;
+	}
+
+	@Override
+	public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
+		if (!isStreaming) {
+			// As multiple input nodes use function call to deliver records between sub-operators,
+			// we cannot rely on network buffers to buffer records not yet ready to be read,
+			// so only BLOCKING dam behavior is safe here.
+			// If conflict is detected under this stricter constraint,
+			// we add a PIPELINED exchange to mark that its input and output node cannot be merged
+			// into the same multiple input node
+			InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(
+				sinkNodes,
+				Collections.emptySet(),
+				ExecEdge.DamBehavior.BLOCKING,
+				ShuffleMode.PIPELINED);
+			resolver.detectAndResolve();
+		}
+
+		List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);

Review comment:
       sink -> root

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+	private final boolean isStreaming;
+
+	public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+		this.isStreaming = isStreaming;
+	}
+
+	@Override
+	public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
+		if (!isStreaming) {
+			// As multiple input nodes use function call to deliver records between sub-operators,
+			// we cannot rely on network buffers to buffer records not yet ready to be read,
+			// so only BLOCKING dam behavior is safe here.
+			// If conflict is detected under this stricter constraint,
+			// we add a PIPELINED exchange to mark that its input and output node cannot be merged
+			// into the same multiple input node
+			InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(
+				sinkNodes,
+				Collections.emptySet(),
+				ExecEdge.DamBehavior.BLOCKING,
+				ShuffleMode.PIPELINED);
+			resolver.detectAndResolve();
+		}
+
+		List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+		// sort all nodes in topological order, sinks come first and sources come last
+		List<ExecNodeWrapper> orderedWrappers = topologicalSort(sinkWrappers);
+		// group nodes into multiple input groups
+		createMultipleInputGroups(orderedWrappers);
+		// apply optimizations to remove unnecessary nodes out of multiple input groups
+		optimizeMultipleInputGroups(orderedWrappers);
+
+		// create the real multiple input nodes
+		return createMultipleInputNodes(sinkWrappers);
+	}
+
+	// --------------------------------------------------------------------------------
+	// Wrapping and Sorting
+	// --------------------------------------------------------------------------------
+
+	private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>> sinkNodes) {
+		Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new HashMap<>();
+		AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor() {
+			@Override
+			protected void visitNode(ExecNode<?, ?> node) {
+				ExecNodeWrapper wrapper = wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+				for (ExecNode<?, ?> input : node.getInputNodes()) {
+					ExecNodeWrapper inputWrapper = wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+					wrapper.inputs.add(inputWrapper);
+					inputWrapper.outputs.add(wrapper);
+				}
+				visitInputs(node);
+			}
+		};
+		sinkNodes.forEach(s -> s.accept(visitor));
+
+		List<ExecNodeWrapper> sinkWrappers = new ArrayList<>();
+		for (ExecNode<?, ?> sink : sinkNodes) {
+			ExecNodeWrapper sinkWrapper = wrapperMap.get(sink);
+			Preconditions.checkNotNull(sinkWrapper, "Sink node is not wrapped. This is a bug.");
+			sinkWrappers.add(sinkWrapper);
+		}
+		return sinkWrappers;
+	}
+
+	private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper> sinkWrappers) {
+		List<ExecNodeWrapper> result = new ArrayList<>();
+		Queue<ExecNodeWrapper> queue = new LinkedList<>(sinkWrappers);
+		Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+		while (!queue.isEmpty()) {
+			ExecNodeWrapper wrapper = queue.poll();
+			result.add(wrapper);
+			for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+				int visitCount = visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+				if (visitCount == inputWrapper.outputs.size()) {
+					queue.offer(inputWrapper);
+				}
+			}
+		}
+
+		return result;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Creating
+	// --------------------------------------------------------------------------------
+
+	private void createMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sinks to sources
+		for (ExecNodeWrapper wrapper : orderedWrappers) {
+			// we skip nodes which cannot be a member of a multiple input node
+			if (!canBeMultipleInputNodeMember(wrapper)) {
+				continue;
+			}
+
+			// we first try to assign this wrapper into the same group with its outputs
+			MultipleInputGroup outputGroup = canBeInSameGroupWithOutputs(wrapper);
+			if (outputGroup != null) {
+				wrapper.addToGroup(outputGroup);
+				continue;
+			}
+
+			// we then try to create a new multiple input group with this node as the root
+			if (canBeRootOfMultipleInputGroup(wrapper)) {
+				wrapper.createGroup();
+			}
+
+			// all our attempts failed, this node will not be in a multiple input node
+		}
+	}
+
+	private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+		if (wrapper.inputs.isEmpty()) {
+			// sources cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof BatchExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof StreamExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+
+		return true;
+	}
+
+	/**
+	 * A node can only be assigned into the same multiple input group of its outputs
+	 * if all outputs have a group and are the same.
+	 *
+	 * @return the {@link MultipleInputGroup} of the outputs if all outputs have a
+	 *         group and are the same, null otherwise
+	 */
+	private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper wrapper) {
+		if (wrapper.outputs.isEmpty()) {
+			return null;
+		}
+
+		MultipleInputGroup outputGroup = wrapper.outputs.get(0).group;
+		if (outputGroup == null) {
+			return null;
+		}
+
+		for (ExecNodeWrapper outputWrapper : wrapper.outputs) {
+			if (outputWrapper.group != outputGroup) {
+				return null;
+			}
+		}
+
+		return outputGroup;
+	}
+
+	private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+		// only a node with more than one input can be the root,
+		// as one-input operator chaining are handled by operator chains
+		return wrapper.inputs.size() >= 2;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Optimizing
+	// --------------------------------------------------------------------------------
+
+	private void optimizeMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sources to sinks
+		for (int i = orderedWrappers.size() - 1; i >= 0; i--) {
+			ExecNodeWrapper wrapper = orderedWrappers.get(i);
+			MultipleInputGroup group = wrapper.group;
+			if (group == null) {
+				// we only consider nodes currently in a multiple input group
+				continue;
+			}
+
+			boolean isUnion =
+				wrapper.execNode instanceof BatchExecUnion || wrapper.execNode instanceof StreamExecUnion;
+
+			if (group.members.size() == 1) {
+				Preconditions.checkState(
+					wrapper == group.root,
+					"The only member of a multiple input group is not its root. This is a bug.");
+				// optimization 1. we clean up multiple input groups with only 1 member,
+				// unless one of its input is a FLIP-27 source (for maximizing source chaining),
+				// however unions do not apply to this optimization because they're not real operators
+				if (isUnion || wrapper.inputs.stream().noneMatch(inputWrapper -> isNewSource(inputWrapper.execNode))) {
+					wrapper.removeFromGroup();
+				}
+				continue;
+			}
+
+			if (!isTailOfMultipleInputGroup(wrapper)) {
+				// we're not removing a node from the middle of a multiple input group
+				continue;
+			}
+
+			boolean shouldRemove = false;
+			if (isUnion) {
+				// optimization 2. we do not allow union to be the tail of a multiple input
+				// as we're paying extra function calls for this, unless one of the united
+				// input is a FLIP-27 source
+				shouldRemove = wrapper.inputs.stream().noneMatch(inputWrapper -> isNewSource(inputWrapper.execNode));
+			} else if (wrapper.inputs.size() == 1) {
+				// optimization 3. for one-input operators we'll remove it unless its input
+				// is an exchange or a FLIP-27 source, this is mainly to avoid the following
+				// pattern:
+				// non-chainable source -> calc --\
+				//                                 join ->
+				// non-chainable source -> calc --/
+				// if we move two calcs into the multiple input group rooted at the join, we're
+				// directly shuffling large amount of records from the source without filtering
+				// by the calc
+				ExecNode<?, ?> input = wrapper.inputs.get(0).execNode;
+				shouldRemove = !(input instanceof BatchExecExchange) &&
+					!(input instanceof StreamExecExchange) &&
+					!isNewSource(input);
+			}
+
+			// optimization 4. for singleton operations (for example singleton global agg)
+			// we're not including it into the multiple input node as we have to ensure that
+			// the whole multiple input can only have 1 parallelism.
+			// continuous singleton operations connected by forwarding shuffle will be dealt
+			// together with optimization 3
+			shouldRemove |= wrapper.inputs.stream().anyMatch(inputWrapper ->
+				inputWrapper.execNode instanceof BatchExecExchange &&
+					((BatchExecExchange) inputWrapper.execNode)
+						.distribution.getType() == RelDistribution.Type.SINGLETON);
+
+			if (shouldRemove) {
+				wrapper.removeFromGroup();
+			}
+		}
+	}
+
+	private boolean isTailOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+		Preconditions.checkNotNull(
+			wrapper.group,
+			"Exec node wrapper does not have a multiple input group. This is a bug.");
+		for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+			if (inputWrapper.group == wrapper.group) {
+				// one of the input is in the same group, so this node is not the tail of the group
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private boolean isNewSource(ExecNode<?, ?> node) {
+		if (node instanceof BatchExecBoundedStreamScan) {
+			BatchExecBoundedStreamScan scan = (BatchExecBoundedStreamScan) node;
+			return scan.boundedStreamTable().dataStream().getTransformation() instanceof SourceTransformation;
+		} else if (node instanceof StreamExecDataStreamScan) {
+			StreamExecDataStreamScan scan = (StreamExecDataStreamScan) node;
+			return scan.dataStreamTable().dataStream().getTransformation() instanceof SourceTransformation;
+		}
+		return false;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Nodes Creating
+	// --------------------------------------------------------------------------------
+
+	private List<ExecNode<?, ?>> createMultipleInputNodes(List<ExecNodeWrapper> sinkWrappers) {
+		List<ExecNode<?, ?>> result = new ArrayList<>();
+		Map<ExecNodeWrapper, ExecNode<?, ?>> visitMap = new HashMap<>();

Review comment:
       visitMap -> visitedMap

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+	private final boolean isStreaming;
+
+	public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+		this.isStreaming = isStreaming;
+	}
+
+	@Override
+	public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
+		if (!isStreaming) {
+			// As multiple input nodes use function call to deliver records between sub-operators,
+			// we cannot rely on network buffers to buffer records not yet ready to be read,
+			// so only BLOCKING dam behavior is safe here.
+			// If conflict is detected under this stricter constraint,
+			// we add a PIPELINED exchange to mark that its input and output node cannot be merged
+			// into the same multiple input node
+			InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(
+				sinkNodes,
+				Collections.emptySet(),
+				ExecEdge.DamBehavior.BLOCKING,
+				ShuffleMode.PIPELINED);
+			resolver.detectAndResolve();
+		}
+
+		List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+		// sort all nodes in topological order, sinks come first and sources come last
+		List<ExecNodeWrapper> orderedWrappers = topologicalSort(sinkWrappers);
+		// group nodes into multiple input groups
+		createMultipleInputGroups(orderedWrappers);
+		// apply optimizations to remove unnecessary nodes out of multiple input groups
+		optimizeMultipleInputGroups(orderedWrappers);
+
+		// create the real multiple input nodes
+		return createMultipleInputNodes(sinkWrappers);
+	}
+
+	// --------------------------------------------------------------------------------
+	// Wrapping and Sorting
+	// --------------------------------------------------------------------------------
+
+	private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>> sinkNodes) {
+		Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new HashMap<>();
+		AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor() {
+			@Override
+			protected void visitNode(ExecNode<?, ?> node) {
+				ExecNodeWrapper wrapper = wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+				for (ExecNode<?, ?> input : node.getInputNodes()) {
+					ExecNodeWrapper inputWrapper = wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+					wrapper.inputs.add(inputWrapper);
+					inputWrapper.outputs.add(wrapper);
+				}
+				visitInputs(node);
+			}
+		};
+		sinkNodes.forEach(s -> s.accept(visitor));
+
+		List<ExecNodeWrapper> sinkWrappers = new ArrayList<>();
+		for (ExecNode<?, ?> sink : sinkNodes) {
+			ExecNodeWrapper sinkWrapper = wrapperMap.get(sink);
+			Preconditions.checkNotNull(sinkWrapper, "Sink node is not wrapped. This is a bug.");
+			sinkWrappers.add(sinkWrapper);
+		}
+		return sinkWrappers;
+	}
+
+	private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper> sinkWrappers) {
+		List<ExecNodeWrapper> result = new ArrayList<>();
+		Queue<ExecNodeWrapper> queue = new LinkedList<>(sinkWrappers);
+		Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+		while (!queue.isEmpty()) {
+			ExecNodeWrapper wrapper = queue.poll();
+			result.add(wrapper);
+			for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+				int visitCount = visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+				if (visitCount == inputWrapper.outputs.size()) {
+					queue.offer(inputWrapper);
+				}
+			}
+		}
+
+		return result;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Creating
+	// --------------------------------------------------------------------------------
+
+	private void createMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sinks to sources
+		for (ExecNodeWrapper wrapper : orderedWrappers) {
+			// we skip nodes which cannot be a member of a multiple input node
+			if (!canBeMultipleInputNodeMember(wrapper)) {
+				continue;
+			}
+
+			// we first try to assign this wrapper into the same group with its outputs
+			MultipleInputGroup outputGroup = canBeInSameGroupWithOutputs(wrapper);
+			if (outputGroup != null) {
+				wrapper.addToGroup(outputGroup);
+				continue;
+			}
+
+			// we then try to create a new multiple input group with this node as the root
+			if (canBeRootOfMultipleInputGroup(wrapper)) {
+				wrapper.createGroup();
+			}
+
+			// all our attempts failed, this node will not be in a multiple input node
+		}
+	}
+
+	private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+		if (wrapper.inputs.isEmpty()) {
+			// sources cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof BatchExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof StreamExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+
+		return true;
+	}
+
+	/**
+	 * A node can only be assigned into the same multiple input group of its outputs
+	 * if all outputs have a group and are the same.
+	 *
+	 * @return the {@link MultipleInputGroup} of the outputs if all outputs have a
+	 *         group and are the same, null otherwise
+	 */
+	private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper wrapper) {
+		if (wrapper.outputs.isEmpty()) {
+			return null;
+		}
+
+		MultipleInputGroup outputGroup = wrapper.outputs.get(0).group;
+		if (outputGroup == null) {
+			return null;
+		}
+
+		for (ExecNodeWrapper outputWrapper : wrapper.outputs) {
+			if (outputWrapper.group != outputGroup) {
+				return null;
+			}
+		}
+
+		return outputGroup;
+	}
+
+	private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+		// only a node with more than one input can be the root,
+		// as one-input operator chaining are handled by operator chains
+		return wrapper.inputs.size() >= 2;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Optimizing
+	// --------------------------------------------------------------------------------
+
+	private void optimizeMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sources to sinks
+		for (int i = orderedWrappers.size() - 1; i >= 0; i--) {
+			ExecNodeWrapper wrapper = orderedWrappers.get(i);
+			MultipleInputGroup group = wrapper.group;
+			if (group == null) {
+				// we only consider nodes currently in a multiple input group
+				continue;
+			}
+
+			boolean isUnion =
+				wrapper.execNode instanceof BatchExecUnion || wrapper.execNode instanceof StreamExecUnion;

Review comment:
       `wrapper.execNode instanceof Union`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+	private final boolean isStreaming;
+
+	public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+		this.isStreaming = isStreaming;
+	}
+
+	@Override
+	public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
+		if (!isStreaming) {
+			// As multiple input nodes use function call to deliver records between sub-operators,
+			// we cannot rely on network buffers to buffer records not yet ready to be read,
+			// so only BLOCKING dam behavior is safe here.
+			// If conflict is detected under this stricter constraint,
+			// we add a PIPELINED exchange to mark that its input and output node cannot be merged
+			// into the same multiple input node
+			InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(
+				sinkNodes,
+				Collections.emptySet(),
+				ExecEdge.DamBehavior.BLOCKING,
+				ShuffleMode.PIPELINED);
+			resolver.detectAndResolve();
+		}
+
+		List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+		// sort all nodes in topological order, sinks come first and sources come last
+		List<ExecNodeWrapper> orderedWrappers = topologicalSort(sinkWrappers);
+		// group nodes into multiple input groups
+		createMultipleInputGroups(orderedWrappers);
+		// apply optimizations to remove unnecessary nodes out of multiple input groups
+		optimizeMultipleInputGroups(orderedWrappers);
+
+		// create the real multiple input nodes
+		return createMultipleInputNodes(sinkWrappers);
+	}
+
+	// --------------------------------------------------------------------------------
+	// Wrapping and Sorting
+	// --------------------------------------------------------------------------------
+
+	private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>> sinkNodes) {
+		Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new HashMap<>();
+		AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor() {
+			@Override
+			protected void visitNode(ExecNode<?, ?> node) {
+				ExecNodeWrapper wrapper = wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+				for (ExecNode<?, ?> input : node.getInputNodes()) {
+					ExecNodeWrapper inputWrapper = wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+					wrapper.inputs.add(inputWrapper);
+					inputWrapper.outputs.add(wrapper);
+				}
+				visitInputs(node);
+			}
+		};
+		sinkNodes.forEach(s -> s.accept(visitor));
+
+		List<ExecNodeWrapper> sinkWrappers = new ArrayList<>();
+		for (ExecNode<?, ?> sink : sinkNodes) {
+			ExecNodeWrapper sinkWrapper = wrapperMap.get(sink);
+			Preconditions.checkNotNull(sinkWrapper, "Sink node is not wrapped. This is a bug.");
+			sinkWrappers.add(sinkWrapper);
+		}
+		return sinkWrappers;
+	}
+
+	private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper> sinkWrappers) {
+		List<ExecNodeWrapper> result = new ArrayList<>();
+		Queue<ExecNodeWrapper> queue = new LinkedList<>(sinkWrappers);
+		Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+		while (!queue.isEmpty()) {
+			ExecNodeWrapper wrapper = queue.poll();
+			result.add(wrapper);
+			for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+				int visitCount = visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+				if (visitCount == inputWrapper.outputs.size()) {
+					queue.offer(inputWrapper);
+				}
+			}
+		}
+
+		return result;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Creating
+	// --------------------------------------------------------------------------------
+
+	private void createMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sinks to sources
+		for (ExecNodeWrapper wrapper : orderedWrappers) {
+			// we skip nodes which cannot be a member of a multiple input node
+			if (!canBeMultipleInputNodeMember(wrapper)) {
+				continue;
+			}
+
+			// we first try to assign this wrapper into the same group with its outputs
+			MultipleInputGroup outputGroup = canBeInSameGroupWithOutputs(wrapper);
+			if (outputGroup != null) {
+				wrapper.addToGroup(outputGroup);
+				continue;
+			}
+
+			// we then try to create a new multiple input group with this node as the root
+			if (canBeRootOfMultipleInputGroup(wrapper)) {
+				wrapper.createGroup();
+			}
+
+			// all our attempts failed, this node will not be in a multiple input node
+		}
+	}
+
+	private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+		if (wrapper.inputs.isEmpty()) {
+			// sources cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof BatchExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof StreamExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+
+		return true;
+	}
+
+	/**
+	 * A node can only be assigned into the same multiple input group of its outputs
+	 * if all outputs have a group and are the same.
+	 *
+	 * @return the {@link MultipleInputGroup} of the outputs if all outputs have a
+	 *         group and are the same, null otherwise
+	 */
+	private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper wrapper) {
+		if (wrapper.outputs.isEmpty()) {
+			return null;
+		}
+
+		MultipleInputGroup outputGroup = wrapper.outputs.get(0).group;
+		if (outputGroup == null) {
+			return null;
+		}
+
+		for (ExecNodeWrapper outputWrapper : wrapper.outputs) {
+			if (outputWrapper.group != outputGroup) {
+				return null;
+			}
+		}
+
+		return outputGroup;
+	}
+
+	private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+		// only a node with more than one input can be the root,
+		// as one-input operator chaining are handled by operator chains
+		return wrapper.inputs.size() >= 2;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Optimizing
+	// --------------------------------------------------------------------------------
+
+	private void optimizeMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sources to sinks
+		for (int i = orderedWrappers.size() - 1; i >= 0; i--) {
+			ExecNodeWrapper wrapper = orderedWrappers.get(i);
+			MultipleInputGroup group = wrapper.group;
+			if (group == null) {
+				// we only consider nodes currently in a multiple input group
+				continue;
+			}
+
+			boolean isUnion =
+				wrapper.execNode instanceof BatchExecUnion || wrapper.execNode instanceof StreamExecUnion;
+
+			if (group.members.size() == 1) {
+				Preconditions.checkState(
+					wrapper == group.root,
+					"The only member of a multiple input group is not its root. This is a bug.");
+				// optimization 1. we clean up multiple input groups with only 1 member,
+				// unless one of its input is a FLIP-27 source (for maximizing source chaining),
+				// however unions do not apply to this optimization because they're not real operators
+				if (isUnion || wrapper.inputs.stream().noneMatch(inputWrapper -> isNewSource(inputWrapper.execNode))) {
+					wrapper.removeFromGroup();
+				}
+				continue;
+			}
+
+			if (!isTailOfMultipleInputGroup(wrapper)) {
+				// we're not removing a node from the middle of a multiple input group
+				continue;
+			}
+
+			boolean shouldRemove = false;
+			if (isUnion) {
+				// optimization 2. we do not allow union to be the tail of a multiple input
+				// as we're paying extra function calls for this, unless one of the united
+				// input is a FLIP-27 source
+				shouldRemove = wrapper.inputs.stream().noneMatch(inputWrapper -> isNewSource(inputWrapper.execNode));
+			} else if (wrapper.inputs.size() == 1) {
+				// optimization 3. for one-input operators we'll remove it unless its input
+				// is an exchange or a FLIP-27 source, this is mainly to avoid the following
+				// pattern:
+				// non-chainable source -> calc --\
+				//                                 join ->
+				// non-chainable source -> calc --/
+				// if we move two calcs into the multiple input group rooted at the join, we're
+				// directly shuffling large amount of records from the source without filtering
+				// by the calc
+				ExecNode<?, ?> input = wrapper.inputs.get(0).execNode;
+				shouldRemove = !(input instanceof BatchExecExchange) &&
+					!(input instanceof StreamExecExchange) &&
+					!isNewSource(input);
+			}
+
+			// optimization 4. for singleton operations (for example singleton global agg)
+			// we're not including it into the multiple input node as we have to ensure that
+			// the whole multiple input can only have 1 parallelism.
+			// continuous singleton operations connected by forwarding shuffle will be dealt
+			// together with optimization 3
+			shouldRemove |= wrapper.inputs.stream().anyMatch(inputWrapper ->
+				inputWrapper.execNode instanceof BatchExecExchange &&
+					((BatchExecExchange) inputWrapper.execNode)
+						.distribution.getType() == RelDistribution.Type.SINGLETON);
+
+			if (shouldRemove) {
+				wrapper.removeFromGroup();
+			}
+		}
+	}
+
+	private boolean isTailOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+		Preconditions.checkNotNull(
+			wrapper.group,
+			"Exec node wrapper does not have a multiple input group. This is a bug.");
+		for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+			if (inputWrapper.group == wrapper.group) {
+				// one of the input is in the same group, so this node is not the tail of the group
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private boolean isNewSource(ExecNode<?, ?> node) {

Review comment:
       `SourceProvider` could also provide new `Source`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+	private final boolean isStreaming;
+
+	public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+		this.isStreaming = isStreaming;
+	}
+
+	@Override
+	public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
+		if (!isStreaming) {
+			// As multiple input nodes use function call to deliver records between sub-operators,
+			// we cannot rely on network buffers to buffer records not yet ready to be read,
+			// so only BLOCKING dam behavior is safe here.
+			// If conflict is detected under this stricter constraint,
+			// we add a PIPELINED exchange to mark that its input and output node cannot be merged
+			// into the same multiple input node
+			InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(
+				sinkNodes,
+				Collections.emptySet(),
+				ExecEdge.DamBehavior.BLOCKING,
+				ShuffleMode.PIPELINED);
+			resolver.detectAndResolve();
+		}
+
+		List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+		// sort all nodes in topological order, sinks come first and sources come last
+		List<ExecNodeWrapper> orderedWrappers = topologicalSort(sinkWrappers);
+		// group nodes into multiple input groups
+		createMultipleInputGroups(orderedWrappers);
+		// apply optimizations to remove unnecessary nodes out of multiple input groups
+		optimizeMultipleInputGroups(orderedWrappers);
+
+		// create the real multiple input nodes
+		return createMultipleInputNodes(sinkWrappers);
+	}
+
+	// --------------------------------------------------------------------------------
+	// Wrapping and Sorting
+	// --------------------------------------------------------------------------------
+
+	private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>> sinkNodes) {
+		Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new HashMap<>();
+		AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor() {
+			@Override
+			protected void visitNode(ExecNode<?, ?> node) {
+				ExecNodeWrapper wrapper = wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+				for (ExecNode<?, ?> input : node.getInputNodes()) {
+					ExecNodeWrapper inputWrapper = wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+					wrapper.inputs.add(inputWrapper);
+					inputWrapper.outputs.add(wrapper);
+				}
+				visitInputs(node);
+			}
+		};
+		sinkNodes.forEach(s -> s.accept(visitor));
+
+		List<ExecNodeWrapper> sinkWrappers = new ArrayList<>();
+		for (ExecNode<?, ?> sink : sinkNodes) {
+			ExecNodeWrapper sinkWrapper = wrapperMap.get(sink);
+			Preconditions.checkNotNull(sinkWrapper, "Sink node is not wrapped. This is a bug.");
+			sinkWrappers.add(sinkWrapper);
+		}
+		return sinkWrappers;
+	}
+
+	private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper> sinkWrappers) {
+		List<ExecNodeWrapper> result = new ArrayList<>();
+		Queue<ExecNodeWrapper> queue = new LinkedList<>(sinkWrappers);
+		Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+		while (!queue.isEmpty()) {
+			ExecNodeWrapper wrapper = queue.poll();
+			result.add(wrapper);
+			for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+				int visitCount = visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+				if (visitCount == inputWrapper.outputs.size()) {
+					queue.offer(inputWrapper);
+				}
+			}
+		}
+
+		return result;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Creating
+	// --------------------------------------------------------------------------------
+
+	private void createMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sinks to sources
+		for (ExecNodeWrapper wrapper : orderedWrappers) {
+			// we skip nodes which cannot be a member of a multiple input node
+			if (!canBeMultipleInputNodeMember(wrapper)) {
+				continue;
+			}
+
+			// we first try to assign this wrapper into the same group with its outputs
+			MultipleInputGroup outputGroup = canBeInSameGroupWithOutputs(wrapper);
+			if (outputGroup != null) {
+				wrapper.addToGroup(outputGroup);
+				continue;
+			}
+
+			// we then try to create a new multiple input group with this node as the root
+			if (canBeRootOfMultipleInputGroup(wrapper)) {
+				wrapper.createGroup();
+			}
+
+			// all our attempts failed, this node will not be in a multiple input node
+		}
+	}
+
+	private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+		if (wrapper.inputs.isEmpty()) {
+			// sources cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof BatchExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+		if (wrapper.execNode instanceof StreamExecExchange) {
+			// exchange cannot be a member of multiple input node
+			return false;
+		}
+
+		return true;
+	}
+
+	/**
+	 * A node can only be assigned into the same multiple input group of its outputs
+	 * if all outputs have a group and are the same.
+	 *
+	 * @return the {@link MultipleInputGroup} of the outputs if all outputs have a
+	 *         group and are the same, null otherwise
+	 */
+	private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper wrapper) {
+		if (wrapper.outputs.isEmpty()) {
+			return null;
+		}
+
+		MultipleInputGroup outputGroup = wrapper.outputs.get(0).group;
+		if (outputGroup == null) {
+			return null;
+		}
+
+		for (ExecNodeWrapper outputWrapper : wrapper.outputs) {
+			if (outputWrapper.group != outputGroup) {
+				return null;
+			}
+		}
+
+		return outputGroup;
+	}
+
+	private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+		// only a node with more than one input can be the root,
+		// as one-input operator chaining are handled by operator chains
+		return wrapper.inputs.size() >= 2;
+	}
+
+	// --------------------------------------------------------------------------------
+	// Multiple Input Groups Optimizing
+	// --------------------------------------------------------------------------------
+
+	private void optimizeMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) {
+		// wrappers are checked in topological order from sources to sinks
+		for (int i = orderedWrappers.size() - 1; i >= 0; i--) {
+			ExecNodeWrapper wrapper = orderedWrappers.get(i);
+			MultipleInputGroup group = wrapper.group;
+			if (group == null) {
+				// we only consider nodes currently in a multiple input group
+				continue;
+			}
+
+			boolean isUnion =
+				wrapper.execNode instanceof BatchExecUnion || wrapper.execNode instanceof StreamExecUnion;
+
+			if (group.members.size() == 1) {
+				Preconditions.checkState(
+					wrapper == group.root,
+					"The only member of a multiple input group is not its root. This is a bug.");
+				// optimization 1. we clean up multiple input groups with only 1 member,
+				// unless one of its input is a FLIP-27 source (for maximizing source chaining),
+				// however unions do not apply to this optimization because they're not real operators
+				if (isUnion || wrapper.inputs.stream().noneMatch(inputWrapper -> isNewSource(inputWrapper.execNode))) {
+					wrapper.removeFromGroup();
+				}
+				continue;
+			}
+
+			if (!isTailOfMultipleInputGroup(wrapper)) {
+				// we're not removing a node from the middle of a multiple input group
+				continue;
+			}
+
+			boolean shouldRemove = false;
+			if (isUnion) {
+				// optimization 2. we do not allow union to be the tail of a multiple input
+				// as we're paying extra function calls for this, unless one of the united
+				// input is a FLIP-27 source
+				shouldRemove = wrapper.inputs.stream().noneMatch(inputWrapper -> isNewSource(inputWrapper.execNode));
+			} else if (wrapper.inputs.size() == 1) {
+				// optimization 3. for one-input operators we'll remove it unless its input
+				// is an exchange or a FLIP-27 source, this is mainly to avoid the following
+				// pattern:
+				// non-chainable source -> calc --\
+				//                                 join ->
+				// non-chainable source -> calc --/
+				// if we move two calcs into the multiple input group rooted at the join, we're
+				// directly shuffling large amount of records from the source without filtering
+				// by the calc
+				ExecNode<?, ?> input = wrapper.inputs.get(0).execNode;
+				shouldRemove = !(input instanceof BatchExecExchange) &&
+					!(input instanceof StreamExecExchange) &&
+					!isNewSource(input);
+			}
+
+			// optimization 4. for singleton operations (for example singleton global agg)
+			// we're not including it into the multiple input node as we have to ensure that
+			// the whole multiple input can only have 1 parallelism.
+			// continuous singleton operations connected by forwarding shuffle will be dealt
+			// together with optimization 3
+			shouldRemove |= wrapper.inputs.stream().anyMatch(inputWrapper ->
+				inputWrapper.execNode instanceof BatchExecExchange &&
+					((BatchExecExchange) inputWrapper.execNode)
+						.distribution.getType() == RelDistribution.Type.SINGLETON);
+
+			if (shouldRemove) {
+				wrapper.removeFromGroup();
+			}
+		}
+	}
+
+	private boolean isTailOfMultipleInputGroup(ExecNodeWrapper wrapper) {

Review comment:
       `Tail` => `Header` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091",
       "triggerID" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098",
       "triggerID" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8276",
       "triggerID" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1121f5b1525294821397a62519e33f21c01a097a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8287",
       "triggerID" : "1121f5b1525294821397a62519e33f21c01a097a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 02e0db125833bf12ab5856b08a82a03fc5454958 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8276) 
   * 1121f5b1525294821397a62519e33f21c01a097a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8287) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8091",
       "triggerID" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8098",
       "triggerID" : "3c50597400946ab21e7dba24673a0b5df30eaf2b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8276",
       "triggerID" : "02e0db125833bf12ab5856b08a82a03fc5454958",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1121f5b1525294821397a62519e33f21c01a097a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8287",
       "triggerID" : "1121f5b1525294821397a62519e33f21c01a097a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "006756c81fa5c333b6fdc4bcbfc8820492a18e9b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "006756c81fa5c333b6fdc4bcbfc8820492a18e9b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1121f5b1525294821397a62519e33f21c01a097a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8287) 
   * 006756c81fa5c333b6fdc4bcbfc8820492a18e9b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13742: [FLINK-19626][table-planner-blink] Introduce multi-input operator construction algorithm

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13742:
URL: https://github.com/apache/flink/pull/13742#issuecomment-714296461


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084",
       "triggerID" : "37cba38abc15a3ee7d1193644be564c0c75ac4c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ddbf38d29507b98186d2b1a22716e7fb9593f5db",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 37cba38abc15a3ee7d1193644be564c0c75ac4c1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8084) 
   * ddbf38d29507b98186d2b1a22716e7fb9593f5db UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org