You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/13 14:33:08 UTC

[GitHub] [flink] lincoln-lil commented on a diff in pull request #20262: [FLINK-28536][table-planner] Adds an internal postOptimize method for physical dag processing

lincoln-lil commented on code in PR #20262:
URL: https://github.com/apache/flink/pull/20262#discussion_r920155207


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/CommonSubGraphBasedOptimizer.scala:
##########
@@ -79,9 +82,23 @@ abstract class CommonSubGraphBasedOptimizer extends Optimizer {
         require(plan != null)
         plan
     }
-    expandIntermediateTableScan(optimizedPlan)
+    val expanded = expandIntermediateTableScan(optimizedPlan)
+
+    val postOptimizedPlan = postOptimize(expanded)
+
+    // Rewrite same rel object to different rel objects
+    // in order to get the correct dag (dag reuse is based on object not digest)
+    val shuttle = new SameRelObjectShuttle()
+    val relsWithoutSameObj = postOptimizedPlan.map(_.accept(shuttle))
+
+    // reuse subplan
+    val tableConfig = roots.head.getTable.unwrap(classOf[TableConfig])
+    SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, tableConfig)
   }
 
+  /** Post process for the physical [[RelNode]] dag, e.g., validation or rewriting purpose. */
+  protected def postOptimize(expanded: Seq[RelNode]): Seq[RelNode]

Review Comment:
   will change it since you have the same doubts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org