You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2021/05/20 14:55:22 UTC

[flink] branch master updated: [FLINK-22155][table] EXPLAIN statement should validate insert and query separately

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new afe0fdd  [FLINK-22155][table] EXPLAIN statement should validate insert and query separately
afe0fdd is described below

commit afe0fddfad0864218329d921215fb3850037d606
Author: zhaown <51...@users.noreply.github.com>
AuthorDate: Thu May 20 22:55:02 2021 +0800

    [FLINK-22155][table] EXPLAIN statement should validate insert and query separately
    
    This closes #15664
---
 .../src/main/codegen/includes/parserImpls.ftl      |  9 +++++++--
 .../src/main/codegen/includes/parserImpls.ftl      |  9 +++++++--
 .../flink/sql/parser/dql/SqlRichExplain.java       |  2 +-
 .../operations/SqlToOperationConverter.java        | 13 +++++++++++-
 .../table/planner/calcite/FlinkPlannerImpl.scala   | 13 +++++++++---
 .../operations/SqlToOperationConverterTest.java    | 19 ++++++++++++++++++
 .../explain/testExecuteSqlWithExplainInsert.out    |  2 +-
 .../table/sqlexec/SqlToOperationConverter.java     | 13 +++++++++++-
 .../flink/table/calcite/FlinkPlannerImpl.scala     | 12 +++++++++--
 .../table/sqlexec/SqlToOperationConverterTest.java | 23 ++++++++++++++++++++++
 .../resources/testExecuteSqlWithExplainInsert0.out |  2 +-
 .../resources/testExecuteSqlWithExplainInsert1.out | 13 +++++++++---
 12 files changed, 113 insertions(+), 17 deletions(-)

diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
index 6457f32..cf7417d 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
@@ -1615,7 +1615,12 @@ SqlNode SqlRichExplain() :
 }
 {
     <EXPLAIN> [ <PLAN> <FOR> ]
-    stmt = SqlQueryOrDml() {
-        return new SqlRichExplain(getPos(),stmt);
+    (
+        stmt = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
+        |
+        stmt = RichSqlInsert()
+    )
+    {
+        return new SqlRichExplain(getPos(), stmt);
     }
 }
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 81cf8fb..363f8ae 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1602,7 +1602,12 @@ SqlNode SqlRichExplain() :
 }
 {
     <EXPLAIN> [ <PLAN> <FOR> ]
-    stmt = SqlQueryOrDml() {
-        return new SqlRichExplain(getPos(),stmt);
+    (
+        stmt = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
+        |
+        stmt = RichSqlInsert()
+    )
+    {
+        return new SqlRichExplain(getPos(), stmt);
     }
 }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java
index 4df9c6a..716c37f 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java
@@ -68,7 +68,7 @@ public class SqlRichExplain extends SqlCall {
             statement = operand;
         } else {
             throw new UnsupportedOperationException(
-                    "SqlExplain SqlNode only support index equals 1");
+                    "SqlRichExplain SqlNode only support index equals 1");
         }
     }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 8181d20..3c43806 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -147,6 +147,7 @@ import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.dialect.CalciteSqlDialect;
 import org.apache.calcite.sql.parser.SqlParser;
@@ -903,7 +904,17 @@ public class SqlToOperationConverter {
 
     /** Convert RICH EXPLAIN statement. */
     private Operation convertRichExplain(SqlRichExplain sqlExplain) {
-        Operation operation = convertSqlQuery(sqlExplain.getStatement());
+        Operation operation;
+        SqlNode sqlNode = sqlExplain.getStatement();
+        if (sqlNode instanceof RichSqlInsert) {
+            operation = convertSqlInsert((RichSqlInsert) sqlNode);
+        } else if (sqlNode instanceof SqlSelect) {
+            operation = convertSqlQuery(sqlExplain.getStatement());
+        } else {
+            throw new ValidationException(
+                    String.format(
+                            "EXPLAIN statement doesn't support %s", sqlNode.getKind().toString()));
+        }
         return new ExplainOperation(operation);
     }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index e036e17..db3e784 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.calcite
 
 import org.apache.flink.sql.parser.ExtendedSqlNode
-import org.apache.flink.sql.parser.dml.{SqlBeginStatementSet, SqlEndStatementSet}
+import org.apache.flink.sql.parser.dml.{RichSqlInsert, SqlBeginStatementSet, SqlEndStatementSet}
 import org.apache.flink.sql.parser.dql._
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader
@@ -145,8 +145,15 @@ class FlinkPlannerImpl(
       }
       sqlNode match {
         case richExplain: SqlRichExplain =>
-          val validated = validator.validate(richExplain.getStatement)
-          richExplain.setOperand(0, validated)
+          val validatedStatement = richExplain.getStatement match {
+            case insert: RichSqlInsert =>
+              val validatedSource = validator.validate(insert.getSource)
+              insert.setOperand(2, validatedSource)
+              insert
+            case others =>
+              validator.validate(others)
+          }
+          richExplain.setOperand(0, validatedStatement)
           richExplain
         case _ =>
           validator.validate(sqlNode)
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index fe16f55..0acb569 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -49,6 +49,7 @@ import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.BeginStatementSetOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.LoadModuleOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ShowFunctionsOperation;
@@ -1411,6 +1412,24 @@ public class SqlToOperationConverterTest {
         assertEquals("END", endStatementSetOperation.asSummaryString());
     }
 
+    @Test
+    public void testSqlRichExplainWithSelect() {
+        final String sql = "explain plan for select a, b, c, d from t2";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        Operation operation = parse(sql, planner, parser);
+        assertTrue(operation instanceof ExplainOperation);
+    }
+
+    @Test
+    public void testSqlRichExplainWithInsert() {
+        final String sql = "explain plan for insert into t1 select a, b, c, d from t2";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        Operation operation = parse(sql, planner, parser);
+        assertTrue(operation instanceof ExplainOperation);
+    }
+
     // ~ Tool Methods ----------------------------------------------------------
 
     private static TestItem createTestItem(Object... args) {
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
index 57a16d8..7126271 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
@@ -1,6 +1,6 @@
 == Abstract Syntax Tree ==
 LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
-+- LogicalProject(d=[$0], e=[$1])
++- LogicalProject(a=[$0], b=[$1])
    +- LogicalFilter(condition=[>($0, 10)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index 70acb49..01231e3 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -105,6 +105,7 @@ import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.dialect.CalciteSqlDialect;
 import org.apache.calcite.sql.parser.SqlParser;
 
@@ -646,7 +647,17 @@ public class SqlToOperationConverter {
 
     /** Convert RICH EXPLAIN statement. */
     private Operation convertRichExplain(SqlRichExplain sqlExplain) {
-        Operation operation = convertSqlQuery(sqlExplain.getStatement());
+        Operation operation;
+        SqlNode sqlNode = sqlExplain.getStatement();
+        if (sqlNode instanceof RichSqlInsert) {
+            operation = convertSqlInsert((RichSqlInsert) sqlNode);
+        } else if (sqlNode instanceof SqlSelect) {
+            operation = convertSqlQuery(sqlExplain.getStatement());
+        } else {
+            throw new ValidationException(
+                    String.format(
+                            "EXPLAIN statement doesn't support %s", sqlNode.getKind().toString()));
+        }
         return new ExplainOperation(operation);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index a45fa14..f6ed619 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.calcite
 
 import org.apache.flink.sql.parser.ExtendedSqlNode
+import org.apache.flink.sql.parser.dml.RichSqlInsert
 import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlRichExplain, SqlShowCatalogs,
   SqlShowCurrentCatalog, SqlShowCurrentDatabase, SqlShowDatabases,
   SqlShowFunctions, SqlShowTables, SqlShowViews}
@@ -135,8 +136,15 @@ class FlinkPlannerImpl(
       }
       sqlNode match {
         case richExplain: SqlRichExplain =>
-          val validated = validator.validate(richExplain.getStatement)
-          richExplain.setOperand(0, validated)
+          val validatedStatement = richExplain.getStatement match {
+            case insert: RichSqlInsert =>
+              val validatedSource = validator.validate(insert.getSource)
+              insert.setOperand(2, validatedSource)
+              insert
+            case others =>
+              validator.validate(others)
+          }
+          richExplain.setOperand(0, validatedStatement)
           richExplain
         case _ =>
           validator.validate(sqlNode)
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 5e44e92..1ab0f03 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.sqlexec;
 
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
 import org.apache.flink.sql.parser.dml.RichSqlInsert;
+import org.apache.flink.sql.parser.dql.SqlRichExplain;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.SqlParserException;
@@ -49,6 +50,7 @@ import org.apache.flink.table.expressions.PlannerExpressionConverter;
 import org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ShowFunctionsOperation;
 import org.apache.flink.table.operations.ShowFunctionsOperation.FunctionScope;
@@ -100,6 +102,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
 
 /** Test cases for {@link SqlToOperationConverter}. * */
@@ -717,6 +720,26 @@ public class SqlToOperationConverterTest {
         assertFailedSetCommand("SET execution.runtime-type=");
     }
 
+    @Test
+    public void testSqlRichExplainWithSelect() {
+        final String sql = "explain plan for select b, c, d from t2";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        SqlNode node = getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql);
+        assertTrue(node instanceof SqlRichExplain);
+        Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get();
+        assertTrue(operation instanceof ExplainOperation);
+    }
+
+    @Test
+    public void testSqlRichExplainWithInsert() {
+        final String sql = "explain plan for insert into t1 select a, b, c, d from t2";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        SqlNode node = getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql);
+        assertTrue(node instanceof SqlRichExplain);
+        Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get();
+        assertTrue(operation instanceof ExplainOperation);
+    }
+
     // ~ Tool Methods ----------------------------------------------------------
 
     private static TestItem createTestItem(Object... args) {
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out
index aad6387..bbd0f53 100644
--- a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out
@@ -1,6 +1,6 @@
 == Abstract Syntax Tree ==
 LogicalSink(name=[default_catalog.default_database.MySink], fields=[d, e])
-  LogicalProject(d=[$0], e=[$1])
+  LogicalProject(a=[$0], b=[$1])
     LogicalFilter(condition=[>($0, 10)])
       LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out
index 98206ae..b904056 100644
--- a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out
@@ -1,6 +1,6 @@
 == Abstract Syntax Tree ==
 LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
-  LogicalProject(d=[$0], e=[$1])
+  LogicalProject(a=[$0], b=[$1])
     LogicalFilter(condition=[>($0, 10)])
       LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 
@@ -28,9 +28,16 @@ DataSetSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
 			driver_strategy : FlatMap
 			Partitioning : RANDOM_PARTITIONED
 
-			 : Data Sink
-				content : org.apache.flink.api.java.io.DiscardingOutputFormat
+			 : Map
+				content : to: Row
 				ship_strategy : Forward
 				exchange_mode : PIPELINED
+				driver_strategy : Map
 				Partitioning : RANDOM_PARTITIONED
 
+				 : Data Sink
+					content : org.apache.flink.api.java.io.LocalCollectionOutputFormat
+					ship_strategy : Forward
+					exchange_mode : PIPELINED
+					Partitioning : RANDOM_PARTITIONED
+