You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/07/30 14:28:17 UTC

[flink] branch master updated: [FLINK-27387][hive] Hive dialect supports multi-insert statement (#19647)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bed7993aaf9 [FLINK-27387][hive] Hive dialect supports multi-insert statement (#19647)
bed7993aaf9 is described below

commit bed7993aaf9838efdd254c8140c36d4ae1f80a56
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Sat Jul 30 22:28:08 2022 +0800

    [FLINK-27387][hive] Hive dialect supports multi-insert statement (#19647)
---
 .../table/planner/delegation/hive/HiveParser.java  | 93 ++++++++++++++++++----
 .../connectors/hive/HiveDialectQueryITCase.java    | 39 +++++++++
 .../src/test/resources/explain/testMultiInsert.out | 32 ++++++++
 3 files changed, 150 insertions(+), 14 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 1a534c61d36..6f8cc71b99c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -29,8 +29,10 @@ import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.module.hive.udf.generic.HiveGenericUDFGrouping;
 import org.apache.flink.table.operations.ExplainOperation;
+import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.NopOperation;
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.StatementSetOperation;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.delegation.ParserImpl;
 import org.apache.flink.table.planner.delegation.PlannerContext;
@@ -46,6 +48,7 @@ import org.apache.flink.table.planner.operations.PlannerQueryOperation;
 import org.apache.flink.table.planner.parse.CalciteParser;
 import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.tools.FrameworkConfig;
@@ -64,6 +67,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.sql.Timestamp;
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -210,10 +214,9 @@ public class HiveParser extends ParserImpl {
     private List<Operation> processCmd(
             String cmd, HiveConf hiveConf, HiveShim hiveShim, HiveCatalog hiveCatalog) {
         try {
-            final HiveParserContext context = new HiveParserContext(hiveConf);
+            HiveParserContext context = new HiveParserContext(hiveConf);
             // parse statement to get AST
-            final HiveParserASTNode node = HiveASTParseUtils.parse(cmd, context);
-            Operation operation;
+            HiveParserASTNode node = HiveASTParseUtils.parse(cmd, context);
             if (DDL_NODES.contains(node.getType())) {
                 HiveParserQueryState queryState = new HiveParserQueryState(hiveConf);
                 HiveParserDDLSemanticAnalyzer ddlAnalyzer =
@@ -228,19 +231,10 @@ public class HiveParser extends ParserImpl {
                                 frameworkConfig,
                                 plannerContext.getCluster(),
                                 plannerContext.getFlinkContext().getClassLoader());
-                operation = ddlAnalyzer.convertToOperation(node);
-                return Collections.singletonList(operation);
+                return Collections.singletonList(ddlAnalyzer.convertToOperation(node));
             } else {
-                final boolean explain = node.getType() == HiveASTParser.TOK_EXPLAIN;
-                // first child is the underlying explicandum
-                HiveParserASTNode input = explain ? (HiveParserASTNode) node.getChild(0) : node;
-                operation = analyzeSql(context, hiveConf, hiveShim, input);
-                // explain an nop is also considered nop
-                if (explain && !(operation instanceof NopOperation)) {
-                    operation = new ExplainOperation(operation);
-                }
+                return processQuery(context, hiveConf, hiveShim, node);
             }
-            return Collections.singletonList(operation);
         } catch (HiveASTParseException e) {
             // ParseException can happen for flink-specific statements, e.g. catalog DDLs
             try {
@@ -253,6 +247,77 @@ public class HiveParser extends ParserImpl {
         }
     }
 
+    private List<Operation> processQuery(
+            HiveParserContext context, HiveConf hiveConf, HiveShim hiveShim, HiveParserASTNode node)
+            throws SemanticException {
+        final boolean explain = node.getType() == HiveASTParser.TOK_EXPLAIN;
+        // first child is the underlying explicandum
+        HiveParserASTNode input = explain ? (HiveParserASTNode) node.getChild(0) : node;
+        if (explain) {
+            Operation operation = convertASTNodeToOperation(context, hiveConf, hiveShim, input);
+            // explain a nop is also considered nop
+            return Collections.singletonList(
+                    operation instanceof NopOperation
+                            ? operation
+                            : new ExplainOperation(operation));
+        }
+        return Collections.singletonList(
+                convertASTNodeToOperation(context, hiveConf, hiveShim, input));
+    }
+
+    private Operation convertASTNodeToOperation(
+            HiveParserContext context,
+            HiveConf hiveConf,
+            HiveShim hiveShim,
+            HiveParserASTNode input)
+            throws SemanticException {
+        if (isMultiDestQuery(input)) {
+            return processMultiDestQuery(context, hiveConf, hiveShim, input);
+        } else {
+            return analyzeSql(context, hiveConf, hiveShim, input);
+        }
+    }
+
+    private boolean isMultiDestQuery(HiveParserASTNode astNode) {
+        // Hive's multi dest insert will always be [FROM, INSERT+]
+        // so, if it's children count is more than 2, it should be a multi-dest query
+        return astNode.getChildCount() > 2;
+    }
+
+    private Operation processMultiDestQuery(
+            HiveParserContext context,
+            HiveConf hiveConf,
+            HiveShim hiveShim,
+            HiveParserASTNode astNode)
+            throws SemanticException {
+        List<Operation> operations = new ArrayList<>();
+        // multi-insert statement may contain multi insert nodes,
+        // the children nodes of the root AST will always be like [FROM, INSERT+]
+        // we pop each insert node and process one by one to construct a list of insert operations
+        List<HiveParserASTNode> insertASTNodes = new ArrayList<>();
+        // pop the insert node one by one
+        while (astNode.getChildCount() > 1) {
+            insertASTNodes.add((HiveParserASTNode) astNode.deleteChild(1));
+        }
+        for (HiveParserASTNode insertASTNode : insertASTNodes) {
+            // mount the insert node to the root AST, consider it as a normal AST and convert it to
+            // operation
+            astNode.addChild(insertASTNode);
+            operations.add(analyzeSql(context, hiveConf, hiveShim, astNode));
+            astNode.deleteChild(astNode.getChildCount() - 1);
+        }
+        // then we wrap them to StatementSetOperation
+        List<ModifyOperation> modifyOperations = new ArrayList<>();
+        for (Operation operation : operations) {
+            Preconditions.checkArgument(
+                    operation instanceof ModifyOperation,
+                    "Encounter an non-ModifyOperation, "
+                            + "only support insert when it contains multiple operations in one single SQL statement.");
+            modifyOperations.add((ModifyOperation) operation);
+        }
+        return new StatementSetOperation(modifyOperations);
+    }
+
     public HiveParserCalcitePlanner createCalcitePlanner(
             HiveParserContext context, HiveParserQueryState queryState, HiveShim hiveShim)
             throws SemanticException {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
index ae103d9cc15..a8628db9982 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
@@ -50,6 +50,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.planner.utils.TableTestUtil.readFromResource;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -435,6 +436,44 @@ public class HiveDialectQueryITCase {
                 .hasMessage(expectedMessage);
     }
 
+    @Test
+    public void testMultiInsert() throws Exception {
+        tableEnv.executeSql("create table t1 (id bigint, name string)");
+        tableEnv.executeSql("create table t2 (id bigint, name string)");
+        tableEnv.executeSql("create table t3 (id bigint, name string, age int)");
+        try {
+            String multiInsertSql =
+                    "from (select id, name, age from t3) t"
+                            + " insert overwrite table t1 select id, name where age < 20"
+                            + "  insert overwrite table t2 select id, name where age > 20";
+            // test explain
+            String actualPlan =
+                    (String)
+                            CollectionUtil.iteratorToList(
+                                            tableEnv.executeSql("explain " + multiInsertSql)
+                                                    .collect())
+                                    .get(0)
+                                    .getField(0);
+            assertThat(actualPlan).isEqualTo(readFromResource("/explain/testMultiInsert.out"));
+            // test execution
+            tableEnv.executeSql("insert into table t3 values (1, 'test1', 18 ), (2, 'test2', 28 )")
+                    .await();
+            tableEnv.executeSql(multiInsertSql).await();
+            List<Row> result =
+                    CollectionUtil.iteratorToList(
+                            tableEnv.executeSql("select * from t1").collect());
+            assertThat(result.toString()).isEqualTo("[+I[1, test1]]");
+            result =
+                    CollectionUtil.iteratorToList(
+                            tableEnv.executeSql("select * from t2").collect());
+            assertThat(result.toString()).isEqualTo("[+I[2, test2]]");
+        } finally {
+            tableEnv.executeSql("drop table t1");
+            tableEnv.executeSql("drop table t2");
+            tableEnv.executeSql("drop table t3");
+        }
+    }
+
     private void runQFile(File qfile) throws Exception {
         QTest qTest = extractQTest(qfile);
         for (int i = 0; i < qTest.statements.size(); i++) {
diff --git a/flink-connectors/flink-connector-hive/src/test/resources/explain/testMultiInsert.out b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMultiInsert.out
new file mode 100644
index 00000000000..dcfd333a2e3
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMultiInsert.out
@@ -0,0 +1,32 @@
+== Abstract Syntax Tree ==
+LogicalSink(table=[test-catalog.default.t1], fields=[id, name])
++- LogicalProject(id=[$0], name=[$1])
+   +- LogicalFilter(condition=[<($2, 20)])
+      +- LogicalProject(id=[$0], name=[$1], age=[$2])
+         +- LogicalTableScan(table=[[test-catalog, default, t3]])
+
+LogicalSink(table=[test-catalog.default.t2], fields=[id, name])
++- LogicalProject(id=[$0], name=[$1])
+   +- LogicalFilter(condition=[>($2, 20)])
+      +- LogicalProject(id=[$0], name=[$1], age=[$2])
+         +- LogicalTableScan(table=[[test-catalog, default, t3]])
+
+== Optimized Physical Plan ==
+Sink(table=[test-catalog.default.t1], fields=[id, name])
++- Calc(select=[id, name], where=[<(age, 20)])
+   +- TableSourceScan(table=[[test-catalog, default, t3]], fields=[id, name, age])
+
+Sink(table=[test-catalog.default.t2], fields=[id, name])
++- Calc(select=[id, name], where=[>(age, 20)])
+   +- TableSourceScan(table=[[test-catalog, default, t3]], fields=[id, name, age])
+
+== Optimized Execution Plan ==
+TableSourceScan(table=[[test-catalog, default, t3]], fields=[id, name, age])(reuse_id=[1])
+
+Sink(table=[test-catalog.default.t1], fields=[id, name])
++- Calc(select=[id, name], where=[(age < 20)])
+   +- Reused(reference_id=[1])
+
+Sink(table=[test-catalog.default.t2], fields=[id, name])
++- Calc(select=[id, name], where=[(age > 20)])
+   +- Reused(reference_id=[1])