You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/01/15 14:08:15 UTC

[flink] 01/02: [FLINK-30668][table-api] Introduce ExplainFormat to Explainable and TableEnvironment

This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 91fc5ba576696d64d7a65c040875e86702188b63
Author: Jane Chan <qi...@gmail.com>
AuthorDate: Fri Jan 13 14:25:47 2023 +0800

    [FLINK-30668][table-api] Introduce ExplainFormat to Explainable and TableEnvironment
    
    This closes #21662
---
 .../api/{Explainable.java => ExplainFormat.java}   | 26 ++++------------------
 .../org/apache/flink/table/api/Explainable.java    | 15 ++++++++++++-
 .../apache/flink/table/api/TableEnvironment.java   | 16 ++++++++++++-
 .../flink/table/api/internal/CompiledPlanImpl.java |  3 ++-
 .../flink/table/api/internal/StatementSetImpl.java |  5 +++--
 .../table/api/internal/TableEnvironmentImpl.java   |  9 +++++---
 .../api/internal/TableEnvironmentInternal.java     | 17 +++++++++++++-
 .../apache/flink/table/api/internal/TableImpl.java |  5 +++--
 .../table/api/internal/TablePipelineImpl.java      |  5 +++--
 9 files changed, 66 insertions(+), 35 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Explainable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainFormat.java
similarity index 51%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Explainable.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainFormat.java
index 59531107510..cc09103a4da 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Explainable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainFormat.java
@@ -20,28 +20,10 @@ package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
 
-/**
- * Represents an artifact that can be explained using a summary string.
- *
- * @see #explain(ExplainDetail...)
- */
+/** Explain format categorizes the output format of explain result. */
 @PublicEvolving
-public interface Explainable<SELF extends Explainable<SELF>> {
-
-    /**
-     * Returns the AST of this object and the execution plan to compute the result of the given
-     * statement.
-     *
-     * @param extraDetails The extra explain details which the result of this method should include,
-     *     e.g. estimated cost, changelog mode for streaming
-     * @return AST and the execution plan.
-     */
-    String explain(ExplainDetail... extraDetails);
+public enum ExplainFormat {
 
-    /** Like {@link #explain(ExplainDetail...)}, but piping the result to {@link System#out}. */
-    @SuppressWarnings("unchecked")
-    default SELF printExplain(ExplainDetail... extraDetails) {
-        System.out.println(explain(extraDetails));
-        return (SELF) this;
-    }
+    /** Explain a {@link Explainable} with plain text format. */
+    TEXT
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Explainable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Explainable.java
index 59531107510..fc7e981f6d0 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Explainable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Explainable.java
@@ -36,7 +36,20 @@ public interface Explainable<SELF extends Explainable<SELF>> {
      *     e.g. estimated cost, changelog mode for streaming
      * @return AST and the execution plan.
      */
-    String explain(ExplainDetail... extraDetails);
+    default String explain(ExplainDetail... extraDetails) {
+        return explain(ExplainFormat.TEXT, extraDetails);
+    }
+
+    /**
+     * Returns the AST of this object and the execution plan to compute the result of the given
+     * statement.
+     *
+     * @param format The output format of explained plan
+     * @param extraDetails The extra explain details which the result of this method should include,
+     *     e.g. estimated cost, changelog mode for streaming
+     * @return AST and the execution plan.
+     */
+    String explain(ExplainFormat format, ExplainDetail... extraDetails);
 
     /** Like {@link #explain(ExplainDetail...)}, but piping the result to {@link System#out}. */
     @SuppressWarnings("unchecked")
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index c47b50bcf52..01d5f174dbf 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -970,7 +970,21 @@ public interface TableEnvironment {
      *     estimated cost, changelog mode for streaming, displaying execution plan in json format
      * @return AST and the execution plan.
      */
-    String explainSql(String statement, ExplainDetail... extraDetails);
+    default String explainSql(String statement, ExplainDetail... extraDetails) {
+        return explainSql(statement, ExplainFormat.TEXT, extraDetails);
+    }
+
+    /**
+     * Returns the AST of the specified statement and the execution plan to compute the result of
+     * the given statement.
+     *
+     * @param statement The statement for which the AST and execution plan will be returned.
+     * @param format The output format of explained plan.
+     * @param extraDetails The extra explain details which the explain result should include, e.g.
+     *     estimated cost, changelog mode for streaming, displaying execution plan in json format
+     * @return AST and the execution plan.
+     */
+    String explainSql(String statement, ExplainFormat format, ExplainDetail... extraDetails);
 
     /**
      * Returns completion hints for the given statement at the given cursor position. The completion
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CompiledPlanImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CompiledPlanImpl.java
index 6efa375eaf7..ea2b399997c 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CompiledPlanImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CompiledPlanImpl.java
@@ -22,6 +22,7 @@ import org.apache.flink.FlinkVersion;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.CompiledPlan;
 import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.ExplainFormat;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.delegation.InternalPlan;
@@ -68,7 +69,7 @@ class CompiledPlanImpl implements CompiledPlan {
     }
 
     @Override
-    public String explain(ExplainDetail... extraDetails) {
+    public String explain(ExplainFormat format, ExplainDetail... extraDetails) {
         return tableEnvironment.explainPlan(internalPlan, extraDetails);
     }
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java
index 2233a5babee..0f349dcd54f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.api.CompiledPlan;
 import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.ExplainFormat;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableDescriptor;
@@ -96,10 +97,10 @@ public class StatementSetImpl<E extends TableEnvironmentInternal> implements Sta
     }
 
     @Override
-    public String explain(ExplainDetail... extraDetails) {
+    public String explain(ExplainFormat format, ExplainDetail... extraDetails) {
         List<Operation> operationList =
                 operations.stream().map(o -> (Operation) o).collect(Collectors.toList());
-        return tableEnvironment.explainInternal(operationList, extraDetails);
+        return tableEnvironment.explainInternal(operationList, format, extraDetails);
     }
 
     @Override
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index f33c2c31f2b..c8797876b91 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.CompiledPlan;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.ExplainFormat;
 import org.apache.flink.table.api.PlanReference;
 import org.apache.flink.table.api.ResultKind;
 import org.apache.flink.table.api.SqlParserException;
@@ -689,7 +690,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
     }
 
     @Override
-    public String explainSql(String statement, ExplainDetail... extraDetails) {
+    public String explainSql(
+            String statement, ExplainFormat format, ExplainDetail... extraDetails) {
         List<Operation> operations = getParser().parse(statement);
 
         if (operations.size() != 1) {
@@ -701,11 +703,12 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
             operations =
                     new ArrayList<>(((StatementSetOperation) operations.get(0)).getOperations());
         }
-        return explainInternal(operations, extraDetails);
+        return explainInternal(operations, format, extraDetails);
     }
 
     @Override
-    public String explainInternal(List<Operation> operations, ExplainDetail... extraDetails) {
+    public String explainInternal(
+            List<Operation> operations, ExplainFormat format, ExplainDetail... extraDetails) {
         operations =
                 operations.stream()
                         .filter(o -> !(o instanceof NopOperation))
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
index 36731abac8c..2f31e30ab44 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.CompiledPlan;
 import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.ExplainFormat;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.catalog.CatalogManager;
@@ -91,7 +92,21 @@ public interface TableEnvironmentInternal extends TableEnvironment {
      *     estimated cost, changelog mode for streaming
      * @return AST and the execution plan.
      */
-    String explainInternal(List<Operation> operations, ExplainDetail... extraDetails);
+    default String explainInternal(List<Operation> operations, ExplainDetail... extraDetails) {
+        return explainInternal(operations, ExplainFormat.TEXT, extraDetails);
+    }
+
+    /**
+     * Returns the AST of this table and the execution plan to compute the result of this table.
+     *
+     * @param operations The operations to be explained.
+     * @param format The output format.
+     * @param extraDetails The extra explain details which the explain result should include, e.g.
+     *     estimated cost, changelog mode for streaming
+     * @return AST and the execution plan.
+     */
+    String explainInternal(
+            List<Operation> operations, ExplainFormat format, ExplainDetail... extraDetails);
 
     /**
      * Registers an external {@link TableSource} in this {@link TableEnvironment}'s catalog.
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 06006e72565..4eece5227b1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.api.internal;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.AggregatedTable;
 import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.ExplainFormat;
 import org.apache.flink.table.api.FlatAggregateTable;
 import org.apache.flink.table.api.GroupWindow;
 import org.apache.flink.table.api.GroupWindowedTable;
@@ -476,9 +477,9 @@ public class TableImpl implements Table {
     }
 
     @Override
-    public String explain(ExplainDetail... extraDetails) {
+    public String explain(ExplainFormat format, ExplainDetail... extraDetails) {
         return tableEnvironment.explainInternal(
-                Collections.singletonList(getQueryOperation()), extraDetails);
+                Collections.singletonList(getQueryOperation()), format, extraDetails);
     }
 
     @Override
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TablePipelineImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TablePipelineImpl.java
index ce108322fe2..ab49eace98f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TablePipelineImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TablePipelineImpl.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.internal;
 
 import org.apache.flink.table.api.CompiledPlan;
 import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.ExplainFormat;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TablePipeline;
 import org.apache.flink.table.api.TableResult;
@@ -57,8 +58,8 @@ class TablePipelineImpl implements TablePipeline {
     }
 
     @Override
-    public String explain(ExplainDetail... extraDetails) {
-        return tableEnvironment.explainInternal(singletonList(operation), extraDetails);
+    public String explain(ExplainFormat format, ExplainDetail... extraDetails) {
+        return tableEnvironment.explainInternal(singletonList(operation), format, extraDetails);
     }
 
     @Override