You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/07/30 14:28:17 UTC
[flink] branch master updated: [FLINK-27387][hive] Hive dialect supports multi-insert statement (#19647)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new bed7993aaf9 [FLINK-27387][hive] Hive dialect supports multi-insert statement (#19647)
bed7993aaf9 is described below
commit bed7993aaf9838efdd254c8140c36d4ae1f80a56
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Sat Jul 30 22:28:08 2022 +0800
[FLINK-27387][hive] Hive dialect supports multi-insert statement (#19647)
---
.../table/planner/delegation/hive/HiveParser.java | 93 ++++++++++++++++++----
.../connectors/hive/HiveDialectQueryITCase.java | 39 +++++++++
.../src/test/resources/explain/testMultiInsert.out | 32 ++++++++
3 files changed, 150 insertions(+), 14 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 1a534c61d36..6f8cc71b99c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -29,8 +29,10 @@ import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.module.hive.udf.generic.HiveGenericUDFGrouping;
import org.apache.flink.table.operations.ExplainOperation;
+import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.NopOperation;
import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.StatementSetOperation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.planner.delegation.PlannerContext;
@@ -46,6 +48,7 @@ import org.apache.flink.table.planner.operations.PlannerQueryOperation;
import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;
import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.tools.FrameworkConfig;
@@ -64,6 +67,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.Timestamp;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -210,10 +214,9 @@ public class HiveParser extends ParserImpl {
private List<Operation> processCmd(
String cmd, HiveConf hiveConf, HiveShim hiveShim, HiveCatalog hiveCatalog) {
try {
- final HiveParserContext context = new HiveParserContext(hiveConf);
+ HiveParserContext context = new HiveParserContext(hiveConf);
// parse statement to get AST
- final HiveParserASTNode node = HiveASTParseUtils.parse(cmd, context);
- Operation operation;
+ HiveParserASTNode node = HiveASTParseUtils.parse(cmd, context);
if (DDL_NODES.contains(node.getType())) {
HiveParserQueryState queryState = new HiveParserQueryState(hiveConf);
HiveParserDDLSemanticAnalyzer ddlAnalyzer =
@@ -228,19 +231,10 @@ public class HiveParser extends ParserImpl {
frameworkConfig,
plannerContext.getCluster(),
plannerContext.getFlinkContext().getClassLoader());
- operation = ddlAnalyzer.convertToOperation(node);
- return Collections.singletonList(operation);
+ return Collections.singletonList(ddlAnalyzer.convertToOperation(node));
} else {
- final boolean explain = node.getType() == HiveASTParser.TOK_EXPLAIN;
- // first child is the underlying explicandum
- HiveParserASTNode input = explain ? (HiveParserASTNode) node.getChild(0) : node;
- operation = analyzeSql(context, hiveConf, hiveShim, input);
- // explain an nop is also considered nop
- if (explain && !(operation instanceof NopOperation)) {
- operation = new ExplainOperation(operation);
- }
+ return processQuery(context, hiveConf, hiveShim, node);
}
- return Collections.singletonList(operation);
} catch (HiveASTParseException e) {
// ParseException can happen for flink-specific statements, e.g. catalog DDLs
try {
@@ -253,6 +247,77 @@ public class HiveParser extends ParserImpl {
}
}
+ private List<Operation> processQuery(
+ HiveParserContext context, HiveConf hiveConf, HiveShim hiveShim, HiveParserASTNode node)
+ throws SemanticException {
+ final boolean explain = node.getType() == HiveASTParser.TOK_EXPLAIN;
+ // first child is the underlying explicandum
+ HiveParserASTNode input = explain ? (HiveParserASTNode) node.getChild(0) : node;
+ if (explain) {
+ Operation operation = convertASTNodeToOperation(context, hiveConf, hiveShim, input);
+ // explain a nop is also considered nop
+ return Collections.singletonList(
+ operation instanceof NopOperation
+ ? operation
+ : new ExplainOperation(operation));
+ }
+ return Collections.singletonList(
+ convertASTNodeToOperation(context, hiveConf, hiveShim, input));
+ }
+
+ private Operation convertASTNodeToOperation(
+ HiveParserContext context,
+ HiveConf hiveConf,
+ HiveShim hiveShim,
+ HiveParserASTNode input)
+ throws SemanticException {
+ if (isMultiDestQuery(input)) {
+ return processMultiDestQuery(context, hiveConf, hiveShim, input);
+ } else {
+ return analyzeSql(context, hiveConf, hiveShim, input);
+ }
+ }
+
+ private boolean isMultiDestQuery(HiveParserASTNode astNode) {
+ // Hive's multi dest insert will always be [FROM, INSERT+]
+ // so, if it's children count is more than 2, it should be a multi-dest query
+ return astNode.getChildCount() > 2;
+ }
+
+ private Operation processMultiDestQuery(
+ HiveParserContext context,
+ HiveConf hiveConf,
+ HiveShim hiveShim,
+ HiveParserASTNode astNode)
+ throws SemanticException {
+ List<Operation> operations = new ArrayList<>();
+ // multi-insert statement may contain multi insert nodes,
+ // the children nodes of the root AST will always be like [FROM, INSERT+]
+ // we pop each insert node and process one by one to construct a list of insert operations
+ List<HiveParserASTNode> insertASTNodes = new ArrayList<>();
+ // pop the insert node one by one
+ while (astNode.getChildCount() > 1) {
+ insertASTNodes.add((HiveParserASTNode) astNode.deleteChild(1));
+ }
+ for (HiveParserASTNode insertASTNode : insertASTNodes) {
+ // mount the insert node to the root AST, consider it as a normal AST and convert it to
+ // operation
+ astNode.addChild(insertASTNode);
+ operations.add(analyzeSql(context, hiveConf, hiveShim, astNode));
+ astNode.deleteChild(astNode.getChildCount() - 1);
+ }
+ // then we wrap them to StatementSetOperation
+ List<ModifyOperation> modifyOperations = new ArrayList<>();
+ for (Operation operation : operations) {
+ Preconditions.checkArgument(
+ operation instanceof ModifyOperation,
+ "Encounter an non-ModifyOperation, "
+ + "only support insert when it contains multiple operations in one single SQL statement.");
+ modifyOperations.add((ModifyOperation) operation);
+ }
+ return new StatementSetOperation(modifyOperations);
+ }
+
public HiveParserCalcitePlanner createCalcitePlanner(
HiveParserContext context, HiveParserQueryState queryState, HiveShim hiveShim)
throws SemanticException {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
index ae103d9cc15..a8628db9982 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
@@ -50,6 +50,7 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.flink.table.planner.utils.TableTestUtil.readFromResource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -435,6 +436,44 @@ public class HiveDialectQueryITCase {
.hasMessage(expectedMessage);
}
+ @Test
+ public void testMultiInsert() throws Exception {
+ tableEnv.executeSql("create table t1 (id bigint, name string)");
+ tableEnv.executeSql("create table t2 (id bigint, name string)");
+ tableEnv.executeSql("create table t3 (id bigint, name string, age int)");
+ try {
+ String multiInsertSql =
+ "from (select id, name, age from t3) t"
+ + " insert overwrite table t1 select id, name where age < 20"
+ + " insert overwrite table t2 select id, name where age > 20";
+ // test explain
+ String actualPlan =
+ (String)
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("explain " + multiInsertSql)
+ .collect())
+ .get(0)
+ .getField(0);
+ assertThat(actualPlan).isEqualTo(readFromResource("/explain/testMultiInsert.out"));
+ // test execution
+ tableEnv.executeSql("insert into table t3 values (1, 'test1', 18 ), (2, 'test2', 28 )")
+ .await();
+ tableEnv.executeSql(multiInsertSql).await();
+ List<Row> result =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select * from t1").collect());
+ assertThat(result.toString()).isEqualTo("[+I[1, test1]]");
+ result =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select * from t2").collect());
+ assertThat(result.toString()).isEqualTo("[+I[2, test2]]");
+ } finally {
+ tableEnv.executeSql("drop table t1");
+ tableEnv.executeSql("drop table t2");
+ tableEnv.executeSql("drop table t3");
+ }
+ }
+
private void runQFile(File qfile) throws Exception {
QTest qTest = extractQTest(qfile);
for (int i = 0; i < qTest.statements.size(); i++) {
diff --git a/flink-connectors/flink-connector-hive/src/test/resources/explain/testMultiInsert.out b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMultiInsert.out
new file mode 100644
index 00000000000..dcfd333a2e3
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMultiInsert.out
@@ -0,0 +1,32 @@
+== Abstract Syntax Tree ==
+LogicalSink(table=[test-catalog.default.t1], fields=[id, name])
++- LogicalProject(id=[$0], name=[$1])
+ +- LogicalFilter(condition=[<($2, 20)])
+ +- LogicalProject(id=[$0], name=[$1], age=[$2])
+ +- LogicalTableScan(table=[[test-catalog, default, t3]])
+
+LogicalSink(table=[test-catalog.default.t2], fields=[id, name])
++- LogicalProject(id=[$0], name=[$1])
+ +- LogicalFilter(condition=[>($2, 20)])
+ +- LogicalProject(id=[$0], name=[$1], age=[$2])
+ +- LogicalTableScan(table=[[test-catalog, default, t3]])
+
+== Optimized Physical Plan ==
+Sink(table=[test-catalog.default.t1], fields=[id, name])
++- Calc(select=[id, name], where=[<(age, 20)])
+ +- TableSourceScan(table=[[test-catalog, default, t3]], fields=[id, name, age])
+
+Sink(table=[test-catalog.default.t2], fields=[id, name])
++- Calc(select=[id, name], where=[>(age, 20)])
+ +- TableSourceScan(table=[[test-catalog, default, t3]], fields=[id, name, age])
+
+== Optimized Execution Plan ==
+TableSourceScan(table=[[test-catalog, default, t3]], fields=[id, name, age])(reuse_id=[1])
+
+Sink(table=[test-catalog.default.t1], fields=[id, name])
++- Calc(select=[id, name], where=[(age < 20)])
+ +- Reused(reference_id=[1])
+
+Sink(table=[test-catalog.default.t2], fields=[id, name])
++- Calc(select=[id, name], where=[(age > 20)])
+ +- Reused(reference_id=[1])