You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/14 04:19:28 UTC

[GitHub] [flink] TsReaper opened a new pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

TsReaper opened a new pull request #13625:
URL: https://github.com/apache/flink/pull/13625


   ## What is the purpose of the change
   
   Deadlock breakup algorithm and Multi-input operator creation algorithm need information about the input edges of an exec node, for example what's the priority of this input, and how the input records will trigger the output records.
   
   Although `BatchExecNode` currently has a `getDamBehavior` method, it only describes the behavior of the node and is not very useful for the new deadlock breakup algorithm. So we're going to introduce a new class `ExecEdge` to describe this and a new method `getInputEdges` for `ExecNode`.
   
   Current implementation of `getInputEdges` for `ExecNode`s is a collection and rewrite of current code (mostly from the `getDamBehavior`) without any test coverage. `getInputEdges` will be called when we introduce the new deadlock breakup and multi-input operator creation algorithm and will be tested along with the algorithm.
   
   ## Brief change log
   
    - Introduce `ExecEdge`
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
TsReaper commented on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-712128956


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738",
       "triggerID" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4c31824917199ca34801d2bde871c5d1f95d0cc2 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738) 
   * f26a40dc87e7f977e84f24c5b509b60f2b083d5c UNKNOWN
   * 741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738",
       "triggerID" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7831",
       "triggerID" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843",
       "triggerID" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "triggerType" : "PUSH"
     }, {
       "hash" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7849",
       "triggerID" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "triggerType" : "PUSH"
     }, {
       "hash" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7868",
       "triggerID" : "712128956",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843",
       "triggerID" : "712128956",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7849",
       "triggerID" : "712128956",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * f26a40dc87e7f977e84f24c5b509b60f2b083d5c UNKNOWN
   * 56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7868) Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7849) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #13625:
URL: https://github.com/apache/flink/pull/13625#discussion_r507511648



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/DeadlockBreakupProcessor.scala
##########
@@ -175,29 +186,32 @@ class DeadlockBreakupProcessor extends DAGProcessor {
               probeRel,
               distribution)
             e.setRequiredShuffleMode(ShuffleMode.BATCH)
-            // replace join node's input
-            join.replaceInputNode(probeSideIndex, e)
+            // replace node's input
+            node.asInstanceOf[BatchExecNode[_]].replaceInputNode(probeSideIndex, e)
         }
       }
     }
 
     override def visit(node: ExecNode[_, _]): Unit = {
       super.visit(node)
-      node match {
-        case hashJoin: BatchExecHashJoin =>
-          val joinInfo = hashJoin.getJoinInfo
-          val columns = if (hashJoin.leftIsBuild) joinInfo.rightKeys else joinInfo.leftKeys
-          val distribution = FlinkRelDistribution.hash(columns)
-          rewriteJoin(hashJoin, hashJoin.leftIsBuild, distribution)
-        case nestedLoopJoin: BatchExecNestedLoopJoin =>
-          rewriteJoin(nestedLoopJoin, nestedLoopJoin.leftIsBuild, FlinkRelDistribution.ANY)
-        case _ => // do nothing
+      val inputEdges = node.getInputEdges
+      if (inputEdges.size() == 2) {
+        val leftPriority = inputEdges.get(0).getPriority
+        val rightPriority = inputEdges.get(1).getPriority
+        val requiredShuffle = if (leftPriority == 1) {

Review comment:
       not changed here

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/DeadlockBreakupProcessor.scala
##########
@@ -296,44 +311,48 @@ class DeadlockBreakupProcessor extends DAGProcessor {
     * Returns true if all input-paths have barrier node (e.g. agg, sort), otherwise false.
     */
   private def hasBarrierNodeInInputPaths(
-      inputPathsOfProbeSide: List[Array[ExecNode[_, _]]]): Boolean = {
-    require(inputPathsOfProbeSide.nonEmpty)
+      inputPathsOfLowerInput: List[Array[ExecNode[_, _]]]): Boolean = {
+    require(inputPathsOfLowerInput.nonEmpty)
 
-    /** Return true if the successor of join in the input-path is build node, otherwise false */
-    def checkJoinBuildSide(
-        buildNode: ExecNode[_, _],
-        idxOfJoin: Int,
+    /** Return true if the successor in the input-path is also in higher input, otherwise false */
+    def checkHigherInput(
+        higherNode: ExecNode[_, _],
+        idx: Int,
         inputPath: Array[ExecNode[_, _]]): Boolean = {
-      if (idxOfJoin < inputPath.length - 1) {
-        val nextNode = inputPath(idxOfJoin + 1)
-        // next node is build node of hash join
-        buildNode eq nextNode
+      if (idx < inputPath.length - 1) {
+        val nextNode = inputPath(idx + 1)
+        // next node is higher input
+        higherNode eq nextNode
       } else {
         false
       }
     }
 
-    inputPathsOfProbeSide.forall {
+    inputPathsOfLowerInput.forall {
       inputPath =>
         var idx = 0
         var hasFullDamNode = false
         // should exclude the reused node (at last position in path)
         while (!hasFullDamNode && idx < inputPath.length - 1) {
           val node = inputPath(idx)
-          val nodeDamBehavior = node.asInstanceOf[BatchExecNode[_]].getDamBehavior
-          hasFullDamNode = if (nodeDamBehavior == DamBehavior.FULL_DAM) {
+          val atLeastEndInput = node.getInputEdges.forall(
+            e => e.getDamBehavior.stricterOrEqual(ExecEdge.DamBehavior.END_INPUT))
+          hasFullDamNode = if (atLeastEndInput) {
             true
           } else {
-            node match {
-              case h: BatchExecHashJoin =>
-                val buildSideIndex = if (h.leftIsBuild) 0 else 1
-                val buildNode = h.getInputNodes.get(buildSideIndex)
-                checkJoinBuildSide(buildNode, idx, inputPath)
-              case n: BatchExecNestedLoopJoin =>
-                val buildSideIndex = if (n.leftIsBuild) 0 else 1
-                val buildNode = n.getInputNodes.get(buildSideIndex)
-                checkJoinBuildSide(buildNode, idx, inputPath)
-              case _ => false
+            val inputEdges = node.getInputEdges
+            if (inputEdges.size() == 2) {
+              val leftPriority = inputEdges.get(0).getPriority
+              val rightPriority = inputEdges.get(1).getPriority
+              if (leftPriority != rightPriority) {
+                val higherIndex = if (leftPriority == 0) 0 else 1

Review comment:
       not changed here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #13625:
URL: https://github.com/apache/flink/pull/13625#discussion_r506540225



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalcBase.scala
##########
@@ -112,11 +111,11 @@ abstract class BatchExecCalcBase(
 
   //~ ExecNode methods -----------------------------------------------------------
 
-  override def getDamBehavior = DamBehavior.PIPELINED
-
   override def getInputNodes: util.List[ExecNode[BatchPlanner, _]] =
     List(getInput.asInstanceOf[ExecNode[BatchPlanner, _]])
 
+  override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.builder().build())

Review comment:
       create a default ExecEdge instance ?

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/DeadlockBreakupProcessor.scala
##########
@@ -320,8 +319,9 @@ class DeadlockBreakupProcessor extends DAGProcessor {
         // should exclude the reused node (at last position in path)
         while (!hasFullDamNode && idx < inputPath.length - 1) {
           val node = inputPath(idx)
-          val nodeDamBehavior = node.asInstanceOf[BatchExecNode[_]].getDamBehavior
-          hasFullDamNode = if (nodeDamBehavior == DamBehavior.FULL_DAM) {
+          val atLeastEndInput = node.getInputEdges.forall(
+            e => e.getDamBehavior.stricterOrEqual(ExecEdge.DamBehavior.END_INPUT))
+          hasFullDamNode = if (atLeastEndInput) {
             true
           } else {
             node match {

Review comment:
       we should avoid match specific class here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2599ad69ed0b2117dff5fd47eb5b51208a2b9efb Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1d3dfd2b49f980db8ac9189ad4bf0d310e19468c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570) 
   * 5a91c75948265e549afbc512bcc66ef820e1562c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on a change in pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
TsReaper commented on a change in pull request #13625:
URL: https://github.com/apache/flink/pull/13625#discussion_r506053031



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
##########
@@ -192,6 +193,11 @@ class BatchExecSortMergeJoin(
   override def getInputNodes: util.List[ExecNode[BatchPlanner, _]] =
     getInputs.map(_.asInstanceOf[ExecNode[BatchPlanner, _]])
 
+  override def getInputEdges: util.List[ExecEdge] =
+    List(
+      new ExecEdge(ExecEdge.RequiredShuffle.unknown(), ExecEdge.EdgeBehavior.END_INPUT, 0),

Review comment:
       No need. Records are only emitted in the `endInput` method for sort merge join. See `SortMergeJoinOperator`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5a91c75948265e549afbc512bcc66ef820e1562c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724) 
   * 4c31824917199ca34801d2bde871c5d1f95d0cc2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738",
       "triggerID" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7831",
       "triggerID" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843",
       "triggerID" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "triggerType" : "PUSH"
     }, {
       "hash" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7849",
       "triggerID" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "triggerType" : "PUSH"
     }, {
       "hash" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7868",
       "triggerID" : "712128956",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843",
       "triggerID" : "712128956",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * f26a40dc87e7f977e84f24c5b509b60f2b083d5c UNKNOWN
   * 193e9a611b030c9809eabeb057a31bce9b22b151 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843) 
   * 56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7849) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7868) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738",
       "triggerID" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7831",
       "triggerID" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843",
       "triggerID" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4c31824917199ca34801d2bde871c5d1f95d0cc2 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738) 
   * f26a40dc87e7f977e84f24c5b509b60f2b083d5c UNKNOWN
   * 741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7831) 
   * 193e9a611b030c9809eabeb057a31bce9b22b151 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
TsReaper commented on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-712130593


   Azure passed in https://dev.azure.com/tsreaper96/Flink/_build/results?buildId=71


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #13625:
URL: https://github.com/apache/flink/pull/13625#discussion_r506906315



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/DeadlockBreakupProcessor.scala
##########
@@ -146,22 +146,33 @@ class DeadlockBreakupProcessor extends DAGProcessor {
 
   class DeadlockBreakupVisitor(finder: ReuseNodeFinder) extends ExecNodeVisitorImpl {
 
-    private def rewriteJoin(
-        join: BatchExecJoinBase,
-        leftIsBuild: Boolean,
-        distribution: FlinkRelDistribution): Unit = {
-      val (buildSideIndex, probeSideIndex) = if (leftIsBuild) (0, 1) else (1, 0)
-      val buildNode = join.getInputNodes.get(buildSideIndex)
-      val probeNode = join.getInputNodes.get(probeSideIndex)
+    private def rewriteTwoInputNode(
+        node: ExecNode[_, _],
+        leftPriority: Int,
+        requiredShuffle: ExecEdge.RequiredShuffle): Unit = {
+      val (buildSideIndex, probeSideIndex) = if (leftPriority == 0) (0, 1) else (1, 0)
+      val buildNode = node.getInputNodes.get(buildSideIndex)

Review comment:
       no build/probe concept here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738",
       "triggerID" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7831",
       "triggerID" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4c31824917199ca34801d2bde871c5d1f95d0cc2 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738) 
   * f26a40dc87e7f977e84f24c5b509b60f2b083d5c UNKNOWN
   * 741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7831) 
   * 193e9a611b030c9809eabeb057a31bce9b22b151 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1d3dfd2b49f980db8ac9189ad4bf0d310e19468c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570) 
   * 5a91c75948265e549afbc512bcc66ef820e1562c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708147417


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 2599ad69ed0b2117dff5fd47eb5b51208a2b9efb (Wed Oct 14 04:22:39 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-19623).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5a91c75948265e549afbc512bcc66ef820e1562c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #13625:
URL: https://github.com/apache/flink/pull/13625#discussion_r506906883



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/DeadlockBreakupProcessor.scala
##########
@@ -146,22 +146,33 @@ class DeadlockBreakupProcessor extends DAGProcessor {
 
   class DeadlockBreakupVisitor(finder: ReuseNodeFinder) extends ExecNodeVisitorImpl {
 
-    private def rewriteJoin(
-        join: BatchExecJoinBase,
-        leftIsBuild: Boolean,
-        distribution: FlinkRelDistribution): Unit = {
-      val (buildSideIndex, probeSideIndex) = if (leftIsBuild) (0, 1) else (1, 0)
-      val buildNode = join.getInputNodes.get(buildSideIndex)
-      val probeNode = join.getInputNodes.get(probeSideIndex)
+    private def rewriteTwoInputNode(
+        node: ExecNode[_, _],
+        leftPriority: Int,
+        requiredShuffle: ExecEdge.RequiredShuffle): Unit = {
+      val (buildSideIndex, probeSideIndex) = if (leftPriority == 0) (0, 1) else (1, 0)

Review comment:
       the highest priority is always 0? do we have this guarantee?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2599ad69ed0b2117dff5fd47eb5b51208a2b9efb Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567) 
   * 1d3dfd2b49f980db8ac9189ad4bf0d310e19468c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738",
       "triggerID" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7831",
       "triggerID" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843",
       "triggerID" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "triggerType" : "PUSH"
     }, {
       "hash" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7849",
       "triggerID" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "triggerType" : "PUSH"
     }, {
       "hash" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7868",
       "triggerID" : "712128956",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843",
       "triggerID" : "712128956",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7849",
       "triggerID" : "712128956",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * f26a40dc87e7f977e84f24c5b509b60f2b083d5c UNKNOWN
   * 56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7868) Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7849) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738",
       "triggerID" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4c31824917199ca34801d2bde871c5d1f95d0cc2 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738) 
   * f26a40dc87e7f977e84f24c5b509b60f2b083d5c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738",
       "triggerID" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7831",
       "triggerID" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843",
       "triggerID" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "triggerType" : "PUSH"
     }, {
       "hash" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7849",
       "triggerID" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f26a40dc87e7f977e84f24c5b509b60f2b083d5c UNKNOWN
   * 193e9a611b030c9809eabeb057a31bce9b22b151 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843) 
   * 56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7849) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738",
       "triggerID" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4c31824917199ca34801d2bde871c5d1f95d0cc2 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1d3dfd2b49f980db8ac9189ad4bf0d310e19468c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #13625:
URL: https://github.com/apache/flink/pull/13625#discussion_r505509149



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+/**
+ * The representation of an edge connecting two {@link ExecNode}.
+ */
+public class ExecEdge {
+
+	private final RequiredShuffle requiredShuffle;
+	private final EdgeBehavior edgeBehavior;
+	// the priority of this edge read by the target node
+	// the smaller the integer, the higher the priority
+	// same integer indicates the same priority
+	private final int priority;
+
+	public ExecEdge(RequiredShuffle requiredShuffle, EdgeBehavior edgeBehavior, int priority) {

Review comment:
       It's better we could use Builder to create an `ExecEdge`, because all single nodes' priority is always 0, many nodes provide unknown `RequiredShuffle`, and maybe there are more properties will be added in the future, e.g. source/target node

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+/**
+ * The representation of an edge connecting two {@link ExecNode}.
+ */
+public class ExecEdge {
+
+	private final RequiredShuffle requiredShuffle;
+	private final EdgeBehavior edgeBehavior;
+	// the priority of this edge read by the target node
+	// the smaller the integer, the higher the priority
+	// same integer indicates the same priority
+	private final int priority;
+
+	public ExecEdge(RequiredShuffle requiredShuffle, EdgeBehavior edgeBehavior, int priority) {
+		this.requiredShuffle = requiredShuffle;
+		this.edgeBehavior = edgeBehavior;
+		this.priority = priority;
+	}
+
+	public RequiredShuffle getRequiredShuffle() {
+		return requiredShuffle;
+	}
+
+	public EdgeBehavior getEdgeBehavior() {
+		return edgeBehavior;
+	}
+
+	public int getPriority() {
+		return priority;
+	}
+
+	/**
+	 * The required shuffle for records when passing this edge.
+	 */
+	public static class RequiredShuffle {
+
+		private final ShuffleType type;
+		private final int[] keys;
+
+		private RequiredShuffle(ShuffleType type, int[] keys) {
+			this.type = type;
+			this.keys = keys;
+		}
+
+		public ShuffleType getType() {
+			return type;
+		}
+
+		public int[] getKeys() {
+			return keys;
+		}
+
+		public static RequiredShuffle any() {

Review comment:
       add some comments for the public apis

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/StreamExecNode.scala
##########
@@ -21,7 +21,21 @@ package org.apache.flink.table.planner.plan.nodes.exec
 import org.apache.flink.table.planner.delegation.StreamPlanner
 import org.apache.flink.table.planner.utils.Logging
 
+import java.util
+
 /**
   * Base class for stream ExecNode.
   */
-trait StreamExecNode[T] extends ExecNode[StreamPlanner, T] with Logging
+trait StreamExecNode[T] extends ExecNode[StreamPlanner, T] with Logging {
+
+  def getInputEdges: util.List[ExecEdge] = {
+    val edges = new util.ArrayList[ExecEdge]()
+    for (_ <- 0 until getInputNodes.size()) {
+      edges.add(new ExecEdge(
+        ExecEdge.RequiredShuffle.unknown(),

Review comment:
       add TODO and explain why the RequiredShuffle is unknown. 

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+/**
+ * The representation of an edge connecting two {@link ExecNode}.
+ */
+public class ExecEdge {
+
+	private final RequiredShuffle requiredShuffle;
+	private final EdgeBehavior edgeBehavior;
+	// the priority of this edge read by the target node
+	// the smaller the integer, the higher the priority
+	// same integer indicates the same priority
+	private final int priority;
+
+	public ExecEdge(RequiredShuffle requiredShuffle, EdgeBehavior edgeBehavior, int priority) {
+		this.requiredShuffle = requiredShuffle;
+		this.edgeBehavior = edgeBehavior;
+		this.priority = priority;
+	}
+
+	public RequiredShuffle getRequiredShuffle() {
+		return requiredShuffle;
+	}
+
+	public EdgeBehavior getEdgeBehavior() {
+		return edgeBehavior;
+	}
+
+	public int getPriority() {
+		return priority;
+	}
+
+	/**
+	 * The required shuffle for records when passing this edge.
+	 */
+	public static class RequiredShuffle {
+
+		private final ShuffleType type;
+		private final int[] keys;
+
+		private RequiredShuffle(ShuffleType type, int[] keys) {
+			this.type = type;
+			this.keys = keys;
+		}
+
+		public ShuffleType getType() {
+			return type;
+		}
+
+		public int[] getKeys() {
+			return keys;
+		}
+
+		public static RequiredShuffle any() {
+			return new RequiredShuffle(ShuffleType.ANY, new int[0]);
+		}
+
+		public static RequiredShuffle hash(int[] keys) {
+			if (keys.length == 0) {
+				return new RequiredShuffle(ShuffleType.ANY, keys);
+			} else {
+				return new RequiredShuffle(ShuffleType.HASH, keys);
+			}
+		}
+
+		public static RequiredShuffle broadcast() {
+			return new RequiredShuffle(ShuffleType.BROADCAST, new int[0]);
+		}
+
+		public static RequiredShuffle singleton() {
+			return new RequiredShuffle(ShuffleType.SINGLETON, new int[0]);
+		}
+
+		public static RequiredShuffle unknown() {
+			return new RequiredShuffle(ShuffleType.UNKNOWN, new int[0]);
+		}
+	}
+
+	/**
+	 * Enumeration which describes the shuffle type for records when passing this edge.
+	 */
+	public enum ShuffleType {
+
+		/**
+		 * Any type of shuffle is OK when passing through this edge.
+		 */
+		ANY,
+
+		/**
+		 * Records are shuffle by hash when passing through this edge.
+		 */
+		HASH,
+
+		/**
+		 * Each sub-partition contains full records.
+		 */
+		BROADCAST,
+
+		/**
+		 * The parallelism of the target node must be 1.
+		 */
+		SINGLETON,
+
+		/**
+		 * Unknown shuffle type, will be filled out in the future.
+		 */
+		UNKNOWN
+	}
+
+	/**
+	 * Enumeration which describes how an output record from the source node
+	 * may trigger the output of the target node.
+	 */
+	public enum EdgeBehavior {

Review comment:
       EdgeDamBehavior ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+/**
+ * The representation of an edge connecting two {@link ExecNode}.
+ */
+public class ExecEdge {
+
+	private final RequiredShuffle requiredShuffle;
+	private final EdgeBehavior edgeBehavior;
+	// the priority of this edge read by the target node
+	// the smaller the integer, the higher the priority
+	// same integer indicates the same priority
+	private final int priority;
+
+	public ExecEdge(RequiredShuffle requiredShuffle, EdgeBehavior edgeBehavior, int priority) {
+		this.requiredShuffle = requiredShuffle;
+		this.edgeBehavior = edgeBehavior;
+		this.priority = priority;
+	}
+
+	public RequiredShuffle getRequiredShuffle() {
+		return requiredShuffle;
+	}
+
+	public EdgeBehavior getEdgeBehavior() {
+		return edgeBehavior;
+	}
+
+	public int getPriority() {
+		return priority;
+	}
+
+	/**
+	 * The required shuffle for records when passing this edge.
+	 */
+	public static class RequiredShuffle {
+
+		private final ShuffleType type;
+		private final int[] keys;
+
+		private RequiredShuffle(ShuffleType type, int[] keys) {
+			this.type = type;
+			this.keys = keys;
+		}
+
+		public ShuffleType getType() {
+			return type;
+		}
+
+		public int[] getKeys() {
+			return keys;
+		}
+
+		public static RequiredShuffle any() {
+			return new RequiredShuffle(ShuffleType.ANY, new int[0]);
+		}
+
+		public static RequiredShuffle hash(int[] keys) {
+			if (keys.length == 0) {
+				return new RequiredShuffle(ShuffleType.ANY, keys);
+			} else {
+				return new RequiredShuffle(ShuffleType.HASH, keys);
+			}
+		}
+
+		public static RequiredShuffle broadcast() {
+			return new RequiredShuffle(ShuffleType.BROADCAST, new int[0]);
+		}
+
+		public static RequiredShuffle singleton() {
+			return new RequiredShuffle(ShuffleType.SINGLETON, new int[0]);
+		}
+
+		public static RequiredShuffle unknown() {
+			return new RequiredShuffle(ShuffleType.UNKNOWN, new int[0]);

Review comment:
       provide another constructor `private RequiredShuffle(ShuffleType type)`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+/**
+ * The representation of an edge connecting two {@link ExecNode}.
+ */
+public class ExecEdge {
+
+	private final RequiredShuffle requiredShuffle;
+	private final EdgeBehavior edgeBehavior;
+	// the priority of this edge read by the target node
+	// the smaller the integer, the higher the priority
+	// same integer indicates the same priority
+	private final int priority;
+
+	public ExecEdge(RequiredShuffle requiredShuffle, EdgeBehavior edgeBehavior, int priority) {
+		this.requiredShuffle = requiredShuffle;
+		this.edgeBehavior = edgeBehavior;
+		this.priority = priority;
+	}
+
+	public RequiredShuffle getRequiredShuffle() {
+		return requiredShuffle;
+	}
+
+	public EdgeBehavior getEdgeBehavior() {
+		return edgeBehavior;
+	}
+
+	public int getPriority() {
+		return priority;
+	}
+
+	/**
+	 * The required shuffle for records when passing this edge.
+	 */
+	public static class RequiredShuffle {
+
+		private final ShuffleType type;
+		private final int[] keys;
+
+		private RequiredShuffle(ShuffleType type, int[] keys) {
+			this.type = type;
+			this.keys = keys;
+		}
+
+		public ShuffleType getType() {
+			return type;
+		}
+
+		public int[] getKeys() {
+			return keys;
+		}
+
+		public static RequiredShuffle any() {
+			return new RequiredShuffle(ShuffleType.ANY, new int[0]);
+		}
+
+		public static RequiredShuffle hash(int[] keys) {
+			if (keys.length == 0) {

Review comment:
       require keys is not empty here.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+/**
+ * The representation of an edge connecting two {@link ExecNode}.
+ */
+public class ExecEdge {
+
+	private final RequiredShuffle requiredShuffle;
+	private final EdgeBehavior edgeBehavior;
+	// the priority of this edge read by the target node
+	// the smaller the integer, the higher the priority
+	// same integer indicates the same priority
+	private final int priority;
+
+	public ExecEdge(RequiredShuffle requiredShuffle, EdgeBehavior edgeBehavior, int priority) {
+		this.requiredShuffle = requiredShuffle;
+		this.edgeBehavior = edgeBehavior;
+		this.priority = priority;
+	}
+
+	public RequiredShuffle getRequiredShuffle() {
+		return requiredShuffle;
+	}
+
+	public EdgeBehavior getEdgeBehavior() {
+		return edgeBehavior;
+	}
+
+	public int getPriority() {
+		return priority;
+	}
+
+	/**
+	 * The required shuffle for records when passing this edge.
+	 */
+	public static class RequiredShuffle {
+
+		private final ShuffleType type;
+		private final int[] keys;
+
+		private RequiredShuffle(ShuffleType type, int[] keys) {
+			this.type = type;
+			this.keys = keys;
+		}
+
+		public ShuffleType getType() {
+			return type;
+		}
+
+		public int[] getKeys() {
+			return keys;
+		}
+
+		public static RequiredShuffle any() {
+			return new RequiredShuffle(ShuffleType.ANY, new int[0]);
+		}
+
+		public static RequiredShuffle hash(int[] keys) {
+			if (keys.length == 0) {
+				return new RequiredShuffle(ShuffleType.ANY, keys);
+			} else {
+				return new RequiredShuffle(ShuffleType.HASH, keys);
+			}
+		}
+
+		public static RequiredShuffle broadcast() {
+			return new RequiredShuffle(ShuffleType.BROADCAST, new int[0]);
+		}
+
+		public static RequiredShuffle singleton() {
+			return new RequiredShuffle(ShuffleType.SINGLETON, new int[0]);
+		}
+
+		public static RequiredShuffle unknown() {
+			return new RequiredShuffle(ShuffleType.UNKNOWN, new int[0]);
+		}
+	}
+
+	/**
+	 * Enumeration which describes the shuffle type for records when passing this edge.
+	 */
+	public enum ShuffleType {
+
+		/**
+		 * Any type of shuffle is OK when passing through this edge.
+		 */
+		ANY,
+
+		/**
+		 * Records are shuffle by hash when passing through this edge.
+		 */
+		HASH,
+
+		/**
+		 * Each sub-partition contains full records.

Review comment:
       use `ExecNode` instead of partition to describe the target

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExchange.scala
##########
@@ -117,6 +116,30 @@ class BatchExecExchange(
   override def getInputNodes: util.List[ExecNode[BatchPlanner, _]] =
     getInputs.map(_.asInstanceOf[ExecNode[BatchPlanner, _]])
 
+  override def getInputEdges: util.List[ExecEdge] = {
+    val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
+    val shuffleMode = getShuffleMode(tableConfig.getConfiguration)
+    if (shuffleMode eq ShuffleMode.BATCH) {
+      List(new ExecEdge(
+        ExecEdge.RequiredShuffle.unknown(),
+        ExecEdge.EdgeBehavior.BLOCKING,
+        0))
+    } else {
+      distribution.getType match {
+        case RelDistribution.Type.RANGE_DISTRIBUTED =>
+          List(new ExecEdge(
+            ExecEdge.RequiredShuffle.unknown(),
+            ExecEdge.EdgeBehavior.BLOCKING,

Review comment:
       END_INPUT ?

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
##########
@@ -192,6 +193,11 @@ class BatchExecSortMergeJoin(
   override def getInputNodes: util.List[ExecNode[BatchPlanner, _]] =
     getInputs.map(_.asInstanceOf[ExecNode[BatchPlanner, _]])
 
+  override def getInputEdges: util.List[ExecEdge] =
+    List(
+      new ExecEdge(ExecEdge.RequiredShuffle.unknown(), ExecEdge.EdgeBehavior.END_INPUT, 0),

Review comment:
       should consider `leftSorted` and `rightSorted`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738",
       "triggerID" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7831",
       "triggerID" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4c31824917199ca34801d2bde871c5d1f95d0cc2 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738) 
   * f26a40dc87e7f977e84f24c5b509b60f2b083d5c UNKNOWN
   * 741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7831) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #13625:
URL: https://github.com/apache/flink/pull/13625#discussion_r506903320



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/BatchExecNode.scala
##########
@@ -25,11 +25,4 @@ import org.apache.flink.table.planner.utils.Logging
 /**
   * Base class for batch ExecNode.
   */
-trait BatchExecNode[T] extends ExecNode[BatchPlanner, T] with Logging {
-
-  /**
-    * Returns [[DamBehavior]] of this node.
-    */
-  def getDamBehavior: DamBehavior
-
-}
+trait BatchExecNode[T] extends ExecNode[BatchPlanner, T] with Logging

Review comment:
       nit: remove unused imports

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecUnion.scala
##########
@@ -98,11 +98,11 @@ class BatchExecUnion(
 
   //~ ExecNode methods -----------------------------------------------------------
 
-  override def getDamBehavior: DamBehavior = DamBehavior.PIPELINED
-
   override def getInputNodes: util.List[ExecNode[BatchPlanner, _]] =
     getInputs.map(_.asInstanceOf[ExecNode[BatchPlanner, _]])
 
+  override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT, ExecEdge.DEFAULT)

Review comment:
       may have more than two inputs

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/StreamExecNode.scala
##########
@@ -21,7 +21,19 @@ package org.apache.flink.table.planner.plan.nodes.exec
 import org.apache.flink.table.planner.delegation.StreamPlanner
 import org.apache.flink.table.planner.utils.Logging
 
+import java.util
+
 /**
   * Base class for stream ExecNode.
   */
-trait StreamExecNode[T] extends ExecNode[StreamPlanner, T] with Logging
+trait StreamExecNode[T] extends ExecNode[StreamPlanner, T] with Logging {
+
+  def getInputEdges: util.List[ExecEdge] = {
+    // TODO fill out the required shuffle for each stream exec node
+    val edges = new util.ArrayList[ExecEdge]()
+    for (_ <- 0 until getInputNodes.size()) {
+      edges.add(ExecEdge.DEFAULT)
+    }
+    edges

Review comment:
       can be simplified as `getInputNodes.map(_ => ExecEdge.DEFAULT)`

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/DeadlockBreakupProcessor.scala
##########
@@ -146,22 +146,33 @@ class DeadlockBreakupProcessor extends DAGProcessor {
 
   class DeadlockBreakupVisitor(finder: ReuseNodeFinder) extends ExecNodeVisitorImpl {
 
-    private def rewriteJoin(
-        join: BatchExecJoinBase,
-        leftIsBuild: Boolean,
-        distribution: FlinkRelDistribution): Unit = {
-      val (buildSideIndex, probeSideIndex) = if (leftIsBuild) (0, 1) else (1, 0)
-      val buildNode = join.getInputNodes.get(buildSideIndex)
-      val probeNode = join.getInputNodes.get(probeSideIndex)
+    private def rewriteTwoInputNode(
+        node: ExecNode[_, _],
+        leftPriority: Int,
+        requiredShuffle: ExecEdge.RequiredShuffle): Unit = {
+      val (buildSideIndex, probeSideIndex) = if (leftPriority == 0) (0, 1) else (1, 0)
+      val buildNode = node.getInputNodes.get(buildSideIndex)

Review comment:
       not build/probe concept here

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/DeadlockBreakupProcessor.scala
##########
@@ -320,20 +334,24 @@ class DeadlockBreakupProcessor extends DAGProcessor {
         // should exclude the reused node (at last position in path)
         while (!hasFullDamNode && idx < inputPath.length - 1) {
           val node = inputPath(idx)
-          val nodeDamBehavior = node.asInstanceOf[BatchExecNode[_]].getDamBehavior
-          hasFullDamNode = if (nodeDamBehavior == DamBehavior.FULL_DAM) {
+          val atLeastEndInput = node.getInputEdges.forall(
+            e => e.getDamBehavior.stricterOrEqual(ExecEdge.DamBehavior.END_INPUT))
+          hasFullDamNode = if (atLeastEndInput) {
             true
           } else {
-            node match {
-              case h: BatchExecHashJoin =>
-                val buildSideIndex = if (h.leftIsBuild) 0 else 1
-                val buildNode = h.getInputNodes.get(buildSideIndex)
-                checkJoinBuildSide(buildNode, idx, inputPath)
-              case n: BatchExecNestedLoopJoin =>
-                val buildSideIndex = if (n.leftIsBuild) 0 else 1
-                val buildNode = n.getInputNodes.get(buildSideIndex)
-                checkJoinBuildSide(buildNode, idx, inputPath)
-              case _ => false
+            val inputEdges = node.getInputEdges
+            if (inputEdges.size() == 2) {
+              val leftPriority = inputEdges.get(0).getPriority
+              val rightPriority = inputEdges.get(1).getPriority
+              if (leftPriority != rightPriority) {
+                val buildSideIndex = if (leftPriority == 0) 0 else 1

Review comment:
       ditto

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/DeadlockBreakupProcessor.scala
##########
@@ -175,29 +186,32 @@ class DeadlockBreakupProcessor extends DAGProcessor {
               probeRel,
               distribution)
             e.setRequiredShuffleMode(ShuffleMode.BATCH)
-            // replace join node's input
-            join.replaceInputNode(probeSideIndex, e)
+            // replace node's input
+            node.asInstanceOf[BatchExecNode[_]].replaceInputNode(probeSideIndex, e)
         }
       }
     }
 
     override def visit(node: ExecNode[_, _]): Unit = {
       super.visit(node)
-      node match {
-        case hashJoin: BatchExecHashJoin =>
-          val joinInfo = hashJoin.getJoinInfo
-          val columns = if (hashJoin.leftIsBuild) joinInfo.rightKeys else joinInfo.leftKeys
-          val distribution = FlinkRelDistribution.hash(columns)
-          rewriteJoin(hashJoin, hashJoin.leftIsBuild, distribution)
-        case nestedLoopJoin: BatchExecNestedLoopJoin =>
-          rewriteJoin(nestedLoopJoin, nestedLoopJoin.leftIsBuild, FlinkRelDistribution.ANY)
-        case _ => // do nothing
+      val inputEdges = node.getInputEdges
+      if (inputEdges.size() == 2) {
+        val leftPriority = inputEdges.get(0).getPriority
+        val rightPriority = inputEdges.get(1).getPriority
+        val requiredShuffle = if (leftPriority == 1) {

Review comment:
       ditto

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/DeadlockBreakupProcessor.scala
##########
@@ -146,22 +146,33 @@ class DeadlockBreakupProcessor extends DAGProcessor {
 
   class DeadlockBreakupVisitor(finder: ReuseNodeFinder) extends ExecNodeVisitorImpl {
 
-    private def rewriteJoin(
-        join: BatchExecJoinBase,
-        leftIsBuild: Boolean,
-        distribution: FlinkRelDistribution): Unit = {
-      val (buildSideIndex, probeSideIndex) = if (leftIsBuild) (0, 1) else (1, 0)
-      val buildNode = join.getInputNodes.get(buildSideIndex)
-      val probeNode = join.getInputNodes.get(probeSideIndex)
+    private def rewriteTwoInputNode(
+        node: ExecNode[_, _],
+        leftPriority: Int,
+        requiredShuffle: ExecEdge.RequiredShuffle): Unit = {
+      val (buildSideIndex, probeSideIndex) = if (leftPriority == 0) (0, 1) else (1, 0)

Review comment:
       the highest priority is always 0? do we has this guarantee?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #13625:
URL: https://github.com/apache/flink/pull/13625#discussion_r506909706



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/StreamExecNode.scala
##########
@@ -21,7 +21,19 @@ package org.apache.flink.table.planner.plan.nodes.exec
 import org.apache.flink.table.planner.delegation.StreamPlanner
 import org.apache.flink.table.planner.utils.Logging
 
+import java.util
+
 /**
   * Base class for stream ExecNode.
   */
-trait StreamExecNode[T] extends ExecNode[StreamPlanner, T] with Logging
+trait StreamExecNode[T] extends ExecNode[StreamPlanner, T] with Logging {
+
+  def getInputEdges: util.List[ExecEdge] = {
+    // TODO fill out the required shuffle for each stream exec node
+    val edges = new util.ArrayList[ExecEdge]()
+    for (_ <- 0 until getInputNodes.size()) {
+      edges.add(ExecEdge.DEFAULT)
+    }
+    edges

Review comment:
       nit: can be simplified as `getInputNodes.map(_ => ExecEdge.DEFAULT)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738",
       "triggerID" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5a91c75948265e549afbc512bcc66ef820e1562c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724) 
   * 4c31824917199ca34801d2bde871c5d1f95d0cc2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738",
       "triggerID" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7831",
       "triggerID" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843",
       "triggerID" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "triggerType" : "PUSH"
     }, {
       "hash" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f26a40dc87e7f977e84f24c5b509b60f2b083d5c UNKNOWN
   * 741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7831) 
   * 193e9a611b030c9809eabeb057a31bce9b22b151 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843) 
   * 56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on a change in pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
TsReaper commented on a change in pull request #13625:
URL: https://github.com/apache/flink/pull/13625#discussion_r506054619



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+/**
+ * The representation of an edge connecting two {@link ExecNode}.
+ */
+public class ExecEdge {
+
+	private final RequiredShuffle requiredShuffle;
+	private final EdgeBehavior edgeBehavior;
+	// the priority of this edge read by the target node
+	// the smaller the integer, the higher the priority
+	// same integer indicates the same priority
+	private final int priority;
+
+	public ExecEdge(RequiredShuffle requiredShuffle, EdgeBehavior edgeBehavior, int priority) {
+		this.requiredShuffle = requiredShuffle;
+		this.edgeBehavior = edgeBehavior;
+		this.priority = priority;
+	}
+
+	public RequiredShuffle getRequiredShuffle() {
+		return requiredShuffle;
+	}
+
+	public EdgeBehavior getEdgeBehavior() {
+		return edgeBehavior;
+	}
+
+	public int getPriority() {
+		return priority;
+	}
+
+	/**
+	 * The required shuffle for records when passing this edge.
+	 */
+	public static class RequiredShuffle {
+
+		private final ShuffleType type;
+		private final int[] keys;
+
+		private RequiredShuffle(ShuffleType type, int[] keys) {
+			this.type = type;
+			this.keys = keys;
+		}
+
+		public ShuffleType getType() {
+			return type;
+		}
+
+		public int[] getKeys() {
+			return keys;
+		}
+
+		public static RequiredShuffle any() {
+			return new RequiredShuffle(ShuffleType.ANY, new int[0]);
+		}
+
+		public static RequiredShuffle hash(int[] keys) {
+			if (keys.length == 0) {
+				return new RequiredShuffle(ShuffleType.ANY, keys);
+			} else {
+				return new RequiredShuffle(ShuffleType.HASH, keys);
+			}
+		}
+
+		public static RequiredShuffle broadcast() {
+			return new RequiredShuffle(ShuffleType.BROADCAST, new int[0]);
+		}
+
+		public static RequiredShuffle singleton() {
+			return new RequiredShuffle(ShuffleType.SINGLETON, new int[0]);
+		}
+
+		public static RequiredShuffle unknown() {
+			return new RequiredShuffle(ShuffleType.UNKNOWN, new int[0]);
+		}
+	}
+
+	/**
+	 * Enumeration which describes the shuffle type for records when passing this edge.
+	 */
+	public enum ShuffleType {
+
+		/**
+		 * Any type of shuffle is OK when passing through this edge.
+		 */
+		ANY,
+
+		/**
+		 * Records are shuffle by hash when passing through this edge.
+		 */
+		HASH,
+
+		/**
+		 * Each sub-partition contains full records.
+		 */
+		BROADCAST,
+
+		/**
+		 * The parallelism of the target node must be 1.
+		 */
+		SINGLETON,
+
+		/**
+		 * Unknown shuffle type, will be filled out in the future.
+		 */
+		UNKNOWN
+	}
+
+	/**
+	 * Enumeration which describes how an output record from the source node
+	 * may trigger the output of the target node.
+	 */
+	public enum EdgeBehavior {

Review comment:
       `DamBehavior` seems to be a better name. This is an inner class so we do not need to worry that this class might be confused with the `DamBehavior` in runtime.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2599ad69ed0b2117dff5fd47eb5b51208a2b9efb Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567) 
   * 1d3dfd2b49f980db8ac9189ad4bf0d310e19468c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738",
       "triggerID" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7831",
       "triggerID" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843",
       "triggerID" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "triggerType" : "PUSH"
     }, {
       "hash" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7849",
       "triggerID" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f26a40dc87e7f977e84f24c5b509b60f2b083d5c UNKNOWN
   * 741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7831) 
   * 193e9a611b030c9809eabeb057a31bce9b22b151 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843) 
   * 56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7849) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2599ad69ed0b2117dff5fd47eb5b51208a2b9efb UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe merged pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
godfreyhe merged pull request #13625:
URL: https://github.com/apache/flink/pull/13625


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13625: [FLINK-19623][table-planner-blink] Introduce ExecEdge to describe information on input edges for ExecNode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13625:
URL: https://github.com/apache/flink/pull/13625#issuecomment-708152298


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7567",
       "triggerID" : "2599ad69ed0b2117dff5fd47eb5b51208a2b9efb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7570",
       "triggerID" : "1d3dfd2b49f980db8ac9189ad4bf0d310e19468c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7724",
       "triggerID" : "5a91c75948265e549afbc512bcc66ef820e1562c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7738",
       "triggerID" : "4c31824917199ca34801d2bde871c5d1f95d0cc2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f26a40dc87e7f977e84f24c5b509b60f2b083d5c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7831",
       "triggerID" : "741d91fc746bf5a05bf1ddc6f0e88e3fab3fefd2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843",
       "triggerID" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "triggerType" : "PUSH"
     }, {
       "hash" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7849",
       "triggerID" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "triggerType" : "PUSH"
     }, {
       "hash" : "193e9a611b030c9809eabeb057a31bce9b22b151",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "712128956",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7868",
       "triggerID" : "712128956",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * f26a40dc87e7f977e84f24c5b509b60f2b083d5c UNKNOWN
   * 193e9a611b030c9809eabeb057a31bce9b22b151 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7843) 
   * 56fa69f06d5d075aff0ceb8cb721fe41eb3f89ee Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7849) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7868) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org