You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/01 10:03:03 UTC

[GitHub] [flink] twalthr commented on a change in pull request #18885: [FLINK-26131][table] CompiledPlan implements Executable

twalthr commented on a change in pull request #18885:
URL: https://github.com/apache/flink/pull/18885#discussion_r816593184



##########
File path: flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/TableCsvFormatITCase.java
##########
@@ -46,9 +45,7 @@ public void testProjectPushDown() throws Exception {
         createSourceTable("MyTable", data, "a bigint", "b int not null", "c varchar");
         File sinkPath = createSinkTable("MySink", "a bigint", "c varchar");
 
-        CompiledPlan compiledPlan =
-                tableEnv.compilePlanSql("insert into MySink select a, c from MyTable");
-        tableEnv.executePlan(compiledPlan).await();
+        tableEnv.compilePlanSql("insert into MySink select a, c from MyTable").execute().await();

Review comment:
       why does the `TableCsvFormatITCase` uses a compiled plan? Looks like somebody didn't know about `executeSql`?

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java
##########
@@ -27,7 +27,7 @@
 import java.nio.file.Paths;
 
 /**
- * Represents a static, executable entity that has been compiled from a Table & SQL API pipeline
+ * Represents an executable entity that has been compiled from a Table & SQL API pipeline

Review comment:
       with `static` I meant to emphasize `immutable` or `fully optimized`

##########
File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
##########
@@ -50,12 +50,12 @@ public String explain(List<Operation> operations, ExplainDetail... extraDetails)
     }
 
     @Override
-    public CompiledPlan loadPlan(PlanReference planReference) throws IOException {
+    public CompiledPlanInternalFactory loadPlan(PlanReference planReference) throws IOException {

Review comment:
       I don't like this additional indirection. The long `CompiledPlanInternalFactory` name already shows how "not so nice" the current approach is compared to the simplicity of the current `Planner` interface.
   
   How about we simply throw an `UnsupportedOperationException` in the `CompiledPlan.execute()` returned by the planner by default and the TableEnvironment can simply wrap the `CompiledPlan` one more time to provide the executable functionality.
   
   Another option would be to pass `TableEnvironmentInternal` to `loadPlan` and `compilePlan` but this is also not very nice because it creates a cyclic dependency.

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
##########
@@ -126,4 +127,37 @@
      */
     @Experimental
     CompiledPlan compilePlan(List<ModifyOperation> operations);
+
+    /**

Review comment:
       remove all JavaDocs. this is an internal class. it can only become outdated.

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
##########
@@ -1103,54 +1103,21 @@ void createFunction(
      *
      * <p>Note: The compiled plan feature is not supported in batch mode.
      *
-     * @see #executePlan(CompiledPlan)
+     * @see CompiledPlan#execute()
      * @see #loadPlan(PlanReference)
      * @throws TableException if the SQL statement is invalid or if the plan cannot be persisted.
      */
     @Experimental
     CompiledPlan compilePlanSql(String stmt) throws TableException;
 
     /**
-     * Executes the provided {@link CompiledPlan}.
+     * Shorthand for {@code tEnv.loadPlan(planReference).execute()}.
      *
-     * <p>Compiled plans can be persisted and reloaded across Flink versions. They describe static
-     * pipelines to ensure backwards compatibility and enable stateful streaming job upgrades. See
-     * {@link CompiledPlan} and the website documentation for more information.
-     *
-     * <p>If a job is resumed from a savepoint, it will eventually resume the execution.
-     *
-     * <p>Note: The compiled plan feature is not supported in batch mode.
-     *
-     * @see #compilePlanSql(String)
      * @see #loadPlan(PlanReference)
-     */
-    @Experimental
-    TableResult executePlan(CompiledPlan plan);
-
-    /**
-     * Shorthand for {@code tEnv.executePlan(tEnv.loadPlan(planReference))}.
-     *
-     * @see #loadPlan(PlanReference)
-     * @see #executePlan(CompiledPlan)
+     * @see CompiledPlan#execute()

Review comment:
       also link to `#load`

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
##########
@@ -135,12 +135,13 @@ class BatchPlanner(
     new BatchPlanner(executor, config, moduleManager, functionCatalog, catalogManager)
   }
 
-  override def loadPlan(planReference: PlanReference): CompiledPlanInternal = {
+  override def loadPlan(planReference: PlanReference): CompiledPlanInternalFactory = {
     throw new UnsupportedOperationException(
       "The compiled plan feature is not supported in batch mode.")
   }
 
-  override def compilePlan(modifyOperations: util.List[ModifyOperation]): CompiledPlanInternal =
+  override def compilePlan(

Review comment:
       fix formatting

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
##########
@@ -155,27 +155,18 @@ class StreamPlanner(
         "Unknown PlanReference. This is a bug, please contact the developers")
     }
 
-    new ExecNodeGraphCompiledPlan(
-      this,
-      JsonSerdeUtil.createObjectWriter(createSerdeContext)
-        .withDefaultPrettyPrinter()
-        .writeValueAsString(execNodeGraph),
-      execNodeGraph)
+    createCompiledPlanInternalFactory(ctx, execNodeGraph)
   }
 
-  override def compilePlan(modifyOperations: util.List[ModifyOperation]): CompiledPlanInternal = {
+  override def compilePlan(

Review comment:
       formatting here and below




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org