You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2021/05/20 14:55:22 UTC
[flink] branch master updated: [FLINK-22155][table] EXPLAIN
statement should validate insert and query separately
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new afe0fdd [FLINK-22155][table] EXPLAIN statement should validate insert and query separately
afe0fdd is described below
commit afe0fddfad0864218329d921215fb3850037d606
Author: zhaown <51...@users.noreply.github.com>
AuthorDate: Thu May 20 22:55:02 2021 +0800
[FLINK-22155][table] EXPLAIN statement should validate insert and query separately
This closes #15664
---
.../src/main/codegen/includes/parserImpls.ftl | 9 +++++++--
.../src/main/codegen/includes/parserImpls.ftl | 9 +++++++--
.../flink/sql/parser/dql/SqlRichExplain.java | 2 +-
.../operations/SqlToOperationConverter.java | 13 +++++++++++-
.../table/planner/calcite/FlinkPlannerImpl.scala | 13 +++++++++---
.../operations/SqlToOperationConverterTest.java | 19 ++++++++++++++++++
.../explain/testExecuteSqlWithExplainInsert.out | 2 +-
.../table/sqlexec/SqlToOperationConverter.java | 13 +++++++++++-
.../flink/table/calcite/FlinkPlannerImpl.scala | 12 +++++++++--
.../table/sqlexec/SqlToOperationConverterTest.java | 23 ++++++++++++++++++++++
.../resources/testExecuteSqlWithExplainInsert0.out | 2 +-
.../resources/testExecuteSqlWithExplainInsert1.out | 13 +++++++++---
12 files changed, 113 insertions(+), 17 deletions(-)
diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
index 6457f32..cf7417d 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
@@ -1615,7 +1615,12 @@ SqlNode SqlRichExplain() :
}
{
<EXPLAIN> [ <PLAN> <FOR> ]
- stmt = SqlQueryOrDml() {
- return new SqlRichExplain(getPos(),stmt);
+ (
+ stmt = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
+ |
+ stmt = RichSqlInsert()
+ )
+ {
+ return new SqlRichExplain(getPos(), stmt);
}
}
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 81cf8fb..363f8ae 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1602,7 +1602,12 @@ SqlNode SqlRichExplain() :
}
{
<EXPLAIN> [ <PLAN> <FOR> ]
- stmt = SqlQueryOrDml() {
- return new SqlRichExplain(getPos(),stmt);
+ (
+ stmt = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
+ |
+ stmt = RichSqlInsert()
+ )
+ {
+ return new SqlRichExplain(getPos(), stmt);
}
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java
index 4df9c6a..716c37f 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java
@@ -68,7 +68,7 @@ public class SqlRichExplain extends SqlCall {
statement = operand;
} else {
throw new UnsupportedOperationException(
- "SqlExplain SqlNode only support index equals 1");
+ "SqlRichExplain SqlNode only support index equals 1");
}
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 8181d20..3c43806 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -147,6 +147,7 @@ import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.dialect.CalciteSqlDialect;
import org.apache.calcite.sql.parser.SqlParser;
@@ -903,7 +904,17 @@ public class SqlToOperationConverter {
/** Convert RICH EXPLAIN statement. */
private Operation convertRichExplain(SqlRichExplain sqlExplain) {
- Operation operation = convertSqlQuery(sqlExplain.getStatement());
+ Operation operation;
+ SqlNode sqlNode = sqlExplain.getStatement();
+ if (sqlNode instanceof RichSqlInsert) {
+ operation = convertSqlInsert((RichSqlInsert) sqlNode);
+ } else if (sqlNode instanceof SqlSelect) {
+ operation = convertSqlQuery(sqlExplain.getStatement());
+ } else {
+ throw new ValidationException(
+ String.format(
+ "EXPLAIN statement doesn't support %s", sqlNode.getKind().toString()));
+ }
return new ExplainOperation(operation);
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index e036e17..db3e784 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -19,7 +19,7 @@
package org.apache.flink.table.planner.calcite
import org.apache.flink.sql.parser.ExtendedSqlNode
-import org.apache.flink.sql.parser.dml.{SqlBeginStatementSet, SqlEndStatementSet}
+import org.apache.flink.sql.parser.dml.{RichSqlInsert, SqlBeginStatementSet, SqlEndStatementSet}
import org.apache.flink.sql.parser.dql._
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader
@@ -145,8 +145,15 @@ class FlinkPlannerImpl(
}
sqlNode match {
case richExplain: SqlRichExplain =>
- val validated = validator.validate(richExplain.getStatement)
- richExplain.setOperand(0, validated)
+ val validatedStatement = richExplain.getStatement match {
+ case insert: RichSqlInsert =>
+ val validatedSource = validator.validate(insert.getSource)
+ insert.setOperand(2, validatedSource)
+ insert
+ case others =>
+ validator.validate(others)
+ }
+ richExplain.setOperand(0, validatedStatement)
richExplain
case _ =>
validator.validate(sqlNode)
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index fe16f55..0acb569 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -49,6 +49,7 @@ import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.BeginStatementSetOperation;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.LoadModuleOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ShowFunctionsOperation;
@@ -1411,6 +1412,24 @@ public class SqlToOperationConverterTest {
assertEquals("END", endStatementSetOperation.asSummaryString());
}
+ @Test
+ public void testSqlRichExplainWithSelect() {
+ final String sql = "explain plan for select a, b, c, d from t2";
+ FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+ final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+ Operation operation = parse(sql, planner, parser);
+ assertTrue(operation instanceof ExplainOperation);
+ }
+
+ @Test
+ public void testSqlRichExplainWithInsert() {
+ final String sql = "explain plan for insert into t1 select a, b, c, d from t2";
+ FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+ final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+ Operation operation = parse(sql, planner, parser);
+ assertTrue(operation instanceof ExplainOperation);
+ }
+
// ~ Tool Methods ----------------------------------------------------------
private static TestItem createTestItem(Object... args) {
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
index 57a16d8..7126271 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
@@ -1,6 +1,6 @@
== Abstract Syntax Tree ==
LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
-+- LogicalProject(d=[$0], e=[$1])
++- LogicalProject(a=[$0], b=[$1])
+- LogicalFilter(condition=[>($0, 10)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index 70acb49..01231e3 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -105,6 +105,7 @@ import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.dialect.CalciteSqlDialect;
import org.apache.calcite.sql.parser.SqlParser;
@@ -646,7 +647,17 @@ public class SqlToOperationConverter {
/** Convert RICH EXPLAIN statement. */
private Operation convertRichExplain(SqlRichExplain sqlExplain) {
- Operation operation = convertSqlQuery(sqlExplain.getStatement());
+ Operation operation;
+ SqlNode sqlNode = sqlExplain.getStatement();
+ if (sqlNode instanceof RichSqlInsert) {
+ operation = convertSqlInsert((RichSqlInsert) sqlNode);
+ } else if (sqlNode instanceof SqlSelect) {
+ operation = convertSqlQuery(sqlExplain.getStatement());
+ } else {
+ throw new ValidationException(
+ String.format(
+ "EXPLAIN statement doesn't support %s", sqlNode.getKind().toString()));
+ }
return new ExplainOperation(operation);
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index a45fa14..f6ed619 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.calcite
import org.apache.flink.sql.parser.ExtendedSqlNode
+import org.apache.flink.sql.parser.dml.RichSqlInsert
import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlRichExplain, SqlShowCatalogs,
SqlShowCurrentCatalog, SqlShowCurrentDatabase, SqlShowDatabases,
SqlShowFunctions, SqlShowTables, SqlShowViews}
@@ -135,8 +136,15 @@ class FlinkPlannerImpl(
}
sqlNode match {
case richExplain: SqlRichExplain =>
- val validated = validator.validate(richExplain.getStatement)
- richExplain.setOperand(0, validated)
+ val validatedStatement = richExplain.getStatement match {
+ case insert: RichSqlInsert =>
+ val validatedSource = validator.validate(insert.getSource)
+ insert.setOperand(2, validatedSource)
+ insert
+ case others =>
+ validator.validate(others)
+ }
+ richExplain.setOperand(0, validatedStatement)
richExplain
case _ =>
validator.validate(sqlNode)
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 5e44e92..1ab0f03 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.sqlexec;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
import org.apache.flink.sql.parser.dml.RichSqlInsert;
+import org.apache.flink.sql.parser.dql.SqlRichExplain;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.SqlParserException;
@@ -49,6 +50,7 @@ import org.apache.flink.table.expressions.PlannerExpressionConverter;
import org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ShowFunctionsOperation;
import org.apache.flink.table.operations.ShowFunctionsOperation.FunctionScope;
@@ -100,6 +102,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
/** Test cases for {@link SqlToOperationConverter}. * */
@@ -717,6 +720,26 @@ public class SqlToOperationConverterTest {
assertFailedSetCommand("SET execution.runtime-type=");
}
+ @Test
+ public void testSqlRichExplainWithSelect() {
+ final String sql = "explain plan for select b, c, d from t2";
+ FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+ SqlNode node = getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql);
+ assertTrue(node instanceof SqlRichExplain);
+ Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get();
+ assertTrue(operation instanceof ExplainOperation);
+ }
+
+ @Test
+ public void testSqlRichExplainWithInsert() {
+ final String sql = "explain plan for insert into t1 select a, b, c, d from t2";
+ FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+ SqlNode node = getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql);
+ assertTrue(node instanceof SqlRichExplain);
+ Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get();
+ assertTrue(operation instanceof ExplainOperation);
+ }
+
// ~ Tool Methods ----------------------------------------------------------
private static TestItem createTestItem(Object... args) {
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out
index aad6387..bbd0f53 100644
--- a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out
@@ -1,6 +1,6 @@
== Abstract Syntax Tree ==
LogicalSink(name=[default_catalog.default_database.MySink], fields=[d, e])
- LogicalProject(d=[$0], e=[$1])
+ LogicalProject(a=[$0], b=[$1])
LogicalFilter(condition=[>($0, 10)])
LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out
index 98206ae..b904056 100644
--- a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out
@@ -1,6 +1,6 @@
== Abstract Syntax Tree ==
LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
- LogicalProject(d=[$0], e=[$1])
+ LogicalProject(a=[$0], b=[$1])
LogicalFilter(condition=[>($0, 10)])
LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
@@ -28,9 +28,16 @@ DataSetSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
- : Data Sink
- content : org.apache.flink.api.java.io.DiscardingOutputFormat
+ : Map
+ content : to: Row
ship_strategy : Forward
exchange_mode : PIPELINED
+ driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
+ : Data Sink
+ content : org.apache.flink.api.java.io.LocalCollectionOutputFormat
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ Partitioning : RANDOM_PARTITIONED
+