You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/15 13:28:20 UTC

[GitHub] [flink] godfreyhe commented on a change in pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

godfreyhe commented on a change in pull request #13625:
URL: https://github.com/apache/flink/pull/13625#discussion_r505509149



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+/**
+ * The representation of an edge connecting two {@link ExecNode}.
+ */
+public class ExecEdge {
+
+	private final RequiredShuffle requiredShuffle;
+	private final EdgeBehavior edgeBehavior;
+	// the priority of this edge read by the target node
+	// the smaller the integer, the higher the priority
+	// same integer indicates the same priority
+	private final int priority;
+
+	public ExecEdge(RequiredShuffle requiredShuffle, EdgeBehavior edgeBehavior, int priority) {

Review comment:
       It's better we could use Builder to create an `ExecEdge`, because all single nodes' priority is always 0, many nodes provide unknown `RequiredShuffle`, and maybe there are more properties will be added in the future, e.g. source/target node

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+/**
+ * The representation of an edge connecting two {@link ExecNode}.
+ */
+public class ExecEdge {
+
+	private final RequiredShuffle requiredShuffle;
+	private final EdgeBehavior edgeBehavior;
+	// the priority of this edge read by the target node
+	// the smaller the integer, the higher the priority
+	// same integer indicates the same priority
+	private final int priority;
+
+	public ExecEdge(RequiredShuffle requiredShuffle, EdgeBehavior edgeBehavior, int priority) {
+		this.requiredShuffle = requiredShuffle;
+		this.edgeBehavior = edgeBehavior;
+		this.priority = priority;
+	}
+
+	public RequiredShuffle getRequiredShuffle() {
+		return requiredShuffle;
+	}
+
+	public EdgeBehavior getEdgeBehavior() {
+		return edgeBehavior;
+	}
+
+	public int getPriority() {
+		return priority;
+	}
+
+	/**
+	 * The required shuffle for records when passing this edge.
+	 */
+	public static class RequiredShuffle {
+
+		private final ShuffleType type;
+		private final int[] keys;
+
+		private RequiredShuffle(ShuffleType type, int[] keys) {
+			this.type = type;
+			this.keys = keys;
+		}
+
+		public ShuffleType getType() {
+			return type;
+		}
+
+		public int[] getKeys() {
+			return keys;
+		}
+
+		public static RequiredShuffle any() {

Review comment:
       add some comments for the public apis

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/StreamExecNode.scala
##########
@@ -21,7 +21,21 @@ package org.apache.flink.table.planner.plan.nodes.exec
 import org.apache.flink.table.planner.delegation.StreamPlanner
 import org.apache.flink.table.planner.utils.Logging
 
+import java.util
+
 /**
   * Base class for stream ExecNode.
   */
-trait StreamExecNode[T] extends ExecNode[StreamPlanner, T] with Logging
+trait StreamExecNode[T] extends ExecNode[StreamPlanner, T] with Logging {
+
+  def getInputEdges: util.List[ExecEdge] = {
+    val edges = new util.ArrayList[ExecEdge]()
+    for (_ <- 0 until getInputNodes.size()) {
+      edges.add(new ExecEdge(
+        ExecEdge.RequiredShuffle.unknown(),

Review comment:
       add TODO and explain why the RequiredShuffle is unknown. 

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+/**
+ * The representation of an edge connecting two {@link ExecNode}.
+ */
+public class ExecEdge {
+
+	private final RequiredShuffle requiredShuffle;
+	private final EdgeBehavior edgeBehavior;
+	// the priority of this edge read by the target node
+	// the smaller the integer, the higher the priority
+	// same integer indicates the same priority
+	private final int priority;
+
+	public ExecEdge(RequiredShuffle requiredShuffle, EdgeBehavior edgeBehavior, int priority) {
+		this.requiredShuffle = requiredShuffle;
+		this.edgeBehavior = edgeBehavior;
+		this.priority = priority;
+	}
+
+	public RequiredShuffle getRequiredShuffle() {
+		return requiredShuffle;
+	}
+
+	public EdgeBehavior getEdgeBehavior() {
+		return edgeBehavior;
+	}
+
+	public int getPriority() {
+		return priority;
+	}
+
+	/**
+	 * The required shuffle for records when passing this edge.
+	 */
+	public static class RequiredShuffle {
+
+		private final ShuffleType type;
+		private final int[] keys;
+
+		private RequiredShuffle(ShuffleType type, int[] keys) {
+			this.type = type;
+			this.keys = keys;
+		}
+
+		public ShuffleType getType() {
+			return type;
+		}
+
+		public int[] getKeys() {
+			return keys;
+		}
+
+		public static RequiredShuffle any() {
+			return new RequiredShuffle(ShuffleType.ANY, new int[0]);
+		}
+
+		public static RequiredShuffle hash(int[] keys) {
+			if (keys.length == 0) {
+				return new RequiredShuffle(ShuffleType.ANY, keys);
+			} else {
+				return new RequiredShuffle(ShuffleType.HASH, keys);
+			}
+		}
+
+		public static RequiredShuffle broadcast() {
+			return new RequiredShuffle(ShuffleType.BROADCAST, new int[0]);
+		}
+
+		public static RequiredShuffle singleton() {
+			return new RequiredShuffle(ShuffleType.SINGLETON, new int[0]);
+		}
+
+		public static RequiredShuffle unknown() {
+			return new RequiredShuffle(ShuffleType.UNKNOWN, new int[0]);
+		}
+	}
+
+	/**
+	 * Enumeration which describes the shuffle type for records when passing this edge.
+	 */
+	public enum ShuffleType {
+
+		/**
+		 * Any type of shuffle is OK when passing through this edge.
+		 */
+		ANY,
+
+		/**
+		 * Records are shuffle by hash when passing through this edge.
+		 */
+		HASH,
+
+		/**
+		 * Each sub-partition contains full records.
+		 */
+		BROADCAST,
+
+		/**
+		 * The parallelism of the target node must be 1.
+		 */
+		SINGLETON,
+
+		/**
+		 * Unknown shuffle type, will be filled out in the future.
+		 */
+		UNKNOWN
+	}
+
+	/**
+	 * Enumeration which describes how an output record from the source node
+	 * may trigger the output of the target node.
+	 */
+	public enum EdgeBehavior {

Review comment:
       EdgeDamBehavior ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+/**
+ * The representation of an edge connecting two {@link ExecNode}.
+ */
+public class ExecEdge {
+
+	private final RequiredShuffle requiredShuffle;
+	private final EdgeBehavior edgeBehavior;
+	// the priority of this edge read by the target node
+	// the smaller the integer, the higher the priority
+	// same integer indicates the same priority
+	private final int priority;
+
+	public ExecEdge(RequiredShuffle requiredShuffle, EdgeBehavior edgeBehavior, int priority) {
+		this.requiredShuffle = requiredShuffle;
+		this.edgeBehavior = edgeBehavior;
+		this.priority = priority;
+	}
+
+	public RequiredShuffle getRequiredShuffle() {
+		return requiredShuffle;
+	}
+
+	public EdgeBehavior getEdgeBehavior() {
+		return edgeBehavior;
+	}
+
+	public int getPriority() {
+		return priority;
+	}
+
+	/**
+	 * The required shuffle for records when passing this edge.
+	 */
+	public static class RequiredShuffle {
+
+		private final ShuffleType type;
+		private final int[] keys;
+
+		private RequiredShuffle(ShuffleType type, int[] keys) {
+			this.type = type;
+			this.keys = keys;
+		}
+
+		public ShuffleType getType() {
+			return type;
+		}
+
+		public int[] getKeys() {
+			return keys;
+		}
+
+		public static RequiredShuffle any() {
+			return new RequiredShuffle(ShuffleType.ANY, new int[0]);
+		}
+
+		public static RequiredShuffle hash(int[] keys) {
+			if (keys.length == 0) {
+				return new RequiredShuffle(ShuffleType.ANY, keys);
+			} else {
+				return new RequiredShuffle(ShuffleType.HASH, keys);
+			}
+		}
+
+		public static RequiredShuffle broadcast() {
+			return new RequiredShuffle(ShuffleType.BROADCAST, new int[0]);
+		}
+
+		public static RequiredShuffle singleton() {
+			return new RequiredShuffle(ShuffleType.SINGLETON, new int[0]);
+		}
+
+		public static RequiredShuffle unknown() {
+			return new RequiredShuffle(ShuffleType.UNKNOWN, new int[0]);

Review comment:
       provide another constructor `private RequiredShuffle(ShuffleType type)`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+/**
+ * The representation of an edge connecting two {@link ExecNode}.
+ */
+public class ExecEdge {
+
+	private final RequiredShuffle requiredShuffle;
+	private final EdgeBehavior edgeBehavior;
+	// the priority of this edge read by the target node
+	// the smaller the integer, the higher the priority
+	// same integer indicates the same priority
+	private final int priority;
+
+	public ExecEdge(RequiredShuffle requiredShuffle, EdgeBehavior edgeBehavior, int priority) {
+		this.requiredShuffle = requiredShuffle;
+		this.edgeBehavior = edgeBehavior;
+		this.priority = priority;
+	}
+
+	public RequiredShuffle getRequiredShuffle() {
+		return requiredShuffle;
+	}
+
+	public EdgeBehavior getEdgeBehavior() {
+		return edgeBehavior;
+	}
+
+	public int getPriority() {
+		return priority;
+	}
+
+	/**
+	 * The required shuffle for records when passing this edge.
+	 */
+	public static class RequiredShuffle {
+
+		private final ShuffleType type;
+		private final int[] keys;
+
+		private RequiredShuffle(ShuffleType type, int[] keys) {
+			this.type = type;
+			this.keys = keys;
+		}
+
+		public ShuffleType getType() {
+			return type;
+		}
+
+		public int[] getKeys() {
+			return keys;
+		}
+
+		public static RequiredShuffle any() {
+			return new RequiredShuffle(ShuffleType.ANY, new int[0]);
+		}
+
+		public static RequiredShuffle hash(int[] keys) {
+			if (keys.length == 0) {

Review comment:
       require keys is not empty here.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+/**
+ * The representation of an edge connecting two {@link ExecNode}.
+ */
+public class ExecEdge {
+
+	private final RequiredShuffle requiredShuffle;
+	private final EdgeBehavior edgeBehavior;
+	// the priority of this edge read by the target node
+	// the smaller the integer, the higher the priority
+	// same integer indicates the same priority
+	private final int priority;
+
+	public ExecEdge(RequiredShuffle requiredShuffle, EdgeBehavior edgeBehavior, int priority) {
+		this.requiredShuffle = requiredShuffle;
+		this.edgeBehavior = edgeBehavior;
+		this.priority = priority;
+	}
+
+	public RequiredShuffle getRequiredShuffle() {
+		return requiredShuffle;
+	}
+
+	public EdgeBehavior getEdgeBehavior() {
+		return edgeBehavior;
+	}
+
+	public int getPriority() {
+		return priority;
+	}
+
+	/**
+	 * The required shuffle for records when passing this edge.
+	 */
+	public static class RequiredShuffle {
+
+		private final ShuffleType type;
+		private final int[] keys;
+
+		private RequiredShuffle(ShuffleType type, int[] keys) {
+			this.type = type;
+			this.keys = keys;
+		}
+
+		public ShuffleType getType() {
+			return type;
+		}
+
+		public int[] getKeys() {
+			return keys;
+		}
+
+		public static RequiredShuffle any() {
+			return new RequiredShuffle(ShuffleType.ANY, new int[0]);
+		}
+
+		public static RequiredShuffle hash(int[] keys) {
+			if (keys.length == 0) {
+				return new RequiredShuffle(ShuffleType.ANY, keys);
+			} else {
+				return new RequiredShuffle(ShuffleType.HASH, keys);
+			}
+		}
+
+		public static RequiredShuffle broadcast() {
+			return new RequiredShuffle(ShuffleType.BROADCAST, new int[0]);
+		}
+
+		public static RequiredShuffle singleton() {
+			return new RequiredShuffle(ShuffleType.SINGLETON, new int[0]);
+		}
+
+		public static RequiredShuffle unknown() {
+			return new RequiredShuffle(ShuffleType.UNKNOWN, new int[0]);
+		}
+	}
+
+	/**
+	 * Enumeration which describes the shuffle type for records when passing this edge.
+	 */
+	public enum ShuffleType {
+
+		/**
+		 * Any type of shuffle is OK when passing through this edge.
+		 */
+		ANY,
+
+		/**
+		 * Records are shuffle by hash when passing through this edge.
+		 */
+		HASH,
+
+		/**
+		 * Each sub-partition contains full records.

Review comment:
       use `ExecNode` instead of partition to describe the target

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExchange.scala
##########
@@ -117,6 +116,30 @@ class BatchExecExchange(
   override def getInputNodes: util.List[ExecNode[BatchPlanner, _]] =
     getInputs.map(_.asInstanceOf[ExecNode[BatchPlanner, _]])
 
+  override def getInputEdges: util.List[ExecEdge] = {
+    val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
+    val shuffleMode = getShuffleMode(tableConfig.getConfiguration)
+    if (shuffleMode eq ShuffleMode.BATCH) {
+      List(new ExecEdge(
+        ExecEdge.RequiredShuffle.unknown(),
+        ExecEdge.EdgeBehavior.BLOCKING,
+        0))
+    } else {
+      distribution.getType match {
+        case RelDistribution.Type.RANGE_DISTRIBUTED =>
+          List(new ExecEdge(
+            ExecEdge.RequiredShuffle.unknown(),
+            ExecEdge.EdgeBehavior.BLOCKING,

Review comment:
       END_INPUT ?

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
##########
@@ -192,6 +193,11 @@ class BatchExecSortMergeJoin(
   override def getInputNodes: util.List[ExecNode[BatchPlanner, _]] =
     getInputs.map(_.asInstanceOf[ExecNode[BatchPlanner, _]])
 
+  override def getInputEdges: util.List[ExecEdge] =
+    List(
+      new ExecEdge(ExecEdge.RequiredShuffle.unknown(), ExecEdge.EdgeBehavior.END_INPUT, 0),

Review comment:
       should consider `leftSorted` and `rightSorted`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org