You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/13 09:49:31 UTC

[GitHub] [flink] lincoln-lil opened a new pull request, #20262: [FLINK-28536][table-planner] Adds an internal abstraction PhysicalDagProcessor for physical plan processing

lincoln-lil opened a new pull request, #20262:
URL: https://github.com/apache/flink/pull/20262

   ## What is the purpose of the change
   Adds an internal abstraction PhysicalDagProcessor for physical plan processing.
   
   ## Brief change log
   * Adds an new internal abstraction: PhysicalDagProcessor
   * Let  SubplanReuser extends PhysicalDagProcessor
   
   ## Verifying this change
   Existing test cases
   
   ## Does this pull request potentially affect one of the following parts:
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
     - The serializers: (no )
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   
   ## Documentation
     - Does this pull request introduce a new feature? (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #20262: [FLINK-28536][table-planner] Adds an internal postOptimize method for physical dag processing

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #20262:
URL: https://github.com/apache/flink/pull/20262#discussion_r920155207


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/CommonSubGraphBasedOptimizer.scala:
##########
@@ -79,9 +82,23 @@ abstract class CommonSubGraphBasedOptimizer extends Optimizer {
         require(plan != null)
         plan
     }
-    expandIntermediateTableScan(optimizedPlan)
+    val expanded = expandIntermediateTableScan(optimizedPlan)
+
+    val postOptimizedPlan = postOptimize(expanded)
+
+    // Rewrite same rel object to different rel objects
+    // in order to get the correct dag (dag reuse is based on object not digest)
+    val shuttle = new SameRelObjectShuttle()
+    val relsWithoutSameObj = postOptimizedPlan.map(_.accept(shuttle))
+
+    // reuse subplan
+    val tableConfig = roots.head.getTable.unwrap(classOf[TableConfig])
+    SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, tableConfig)
   }
 
+  /** Post process for the physical [[RelNode]] dag, e.g., validation or rewriting purpose. */
+  protected def postOptimize(expanded: Seq[RelNode]): Seq[RelNode]

Review Comment:
   will change it since you have the same doubts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20262: [FLINK-28536][table-planner] Adds an internal abstraction PhysicalDagProcessor for physical plan processing

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20262:
URL: https://github.com/apache/flink/pull/20262#issuecomment-1183016045

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d55b2fa927e9492a896970d7e1865bdcc6dc2edf",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d55b2fa927e9492a896970d7e1865bdcc6dc2edf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d55b2fa927e9492a896970d7e1865bdcc6dc2edf UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20262: [FLINK-28536][table-planner] Adds an internal postOptimize method for physical dag processing

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20262:
URL: https://github.com/apache/flink/pull/20262#discussion_r920056880


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/CommonSubGraphBasedOptimizer.scala:
##########
@@ -79,9 +82,23 @@ abstract class CommonSubGraphBasedOptimizer extends Optimizer {
         require(plan != null)
         plan
     }
-    expandIntermediateTableScan(optimizedPlan)
+    val expanded = expandIntermediateTableScan(optimizedPlan)
+
+    val postOptimizedPlan = postOptimize(expanded)
+
+    // Rewrite same rel object to different rel objects
+    // in order to get the correct dag (dag reuse is based on object not digest)
+    val shuttle = new SameRelObjectShuttle()
+    val relsWithoutSameObj = postOptimizedPlan.map(_.accept(shuttle))
+
+    // reuse subplan
+    val tableConfig = roots.head.getTable.unwrap(classOf[TableConfig])
+    SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, tableConfig)
   }
 
+  /** Post process for the physical [[RelNode]] dag, e.g., validation or rewriting purpose. */
+  protected def postOptimize(expanded: Seq[RelNode]): Seq[RelNode]

Review Comment:
   nit: give the default implementation ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on pull request #20262: [FLINK-28536][table-planner] Adds an internal postOptimize method for physical dag processing

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on PR #20262:
URL: https://github.com/apache/flink/pull/20262#issuecomment-1183863653

   @godfreyhe thank you for reviewing this!  I've updated the pr according to your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe closed pull request #20262: [FLINK-28536][table-planner] Adds an internal postOptimize method for physical dag processing

Posted by GitBox <gi...@apache.org>.
godfreyhe closed pull request #20262: [FLINK-28536][table-planner] Adds an internal postOptimize method for physical dag processing
URL: https://github.com/apache/flink/pull/20262


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org