You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/09/21 01:46:50 UTC

[flink] 03/04: [FLINK-29222][hive] Fix wrong behavior for Hive's load data inpath

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bff0985aef4ed43681e6ad3bd81fc460bef3c6a5
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Tue Sep 20 22:30:08 2022 +0800

    [FLINK-29222][hive] Fix wrong behavior for Hive's load data inpath
    
    This closes #20778
---
 .../planner/delegation/hive/HiveOperationExecutor.java      |  8 ++++----
 .../flink/connectors/hive/HiveDialectQueryITCase.java       | 13 ++++++++++---
 2 files changed, 14 insertions(+), 7 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
index 2010062fe4c..597584cc3fd 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
@@ -153,14 +153,14 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor {
                         hiveLoadDataOperation.getPath(),
                         hiveLoadDataOperation.getTablePath(),
                         hiveLoadDataOperation.getPartitionSpec(),
-                        hiveLoadDataOperation.isSrcLocal(),
-                        hiveLoadDataOperation.isOverwrite());
+                        hiveLoadDataOperation.isOverwrite(),
+                        hiveLoadDataOperation.isSrcLocal());
             } else {
                 hiveCatalog.loadTable(
                         hiveLoadDataOperation.getPath(),
                         hiveLoadDataOperation.getTablePath(),
-                        hiveLoadDataOperation.isSrcLocal(),
-                        hiveLoadDataOperation.isOverwrite());
+                        hiveLoadDataOperation.isOverwrite(),
+                        hiveLoadDataOperation.isSrcLocal());
             }
             return Optional.of(TableResultImpl.TABLE_RESULT_OK);
         } finally {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
index d305e34ef2d..a2fc20bb1e0 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
@@ -715,17 +715,22 @@ public class HiveDialectQueryITCase {
                                     .replace("$filepath", testLoadCsvFilePath));
 
             // test load data into table
-            tableEnv.executeSql("insert into tab1 values (1, 1), (1, 2), (2, 1), (2, 2)").await();
+            tableEnv.executeSql("insert into tab1 values (1, 1), (1, 2)").await();
             tableEnv.executeSql(
                     String.format(
                             "load data local inpath '%s' INTO TABLE tab2", warehouse + "/tab1"));
             List<Row> result =
                     CollectionUtil.iteratorToList(
                             tableEnv.executeSql("select * from tab2").collect());
-            assertThat(result.toString()).isEqualTo("[+I[1, 1], +I[1, 2], +I[2, 1], +I[2, 2]]");
+            assertThat(result.toString()).isEqualTo("[+I[1, 1], +I[1, 2]]");
+            // there should still exist data in tab1
+            result =
+                    CollectionUtil.iteratorToList(
+                            tableEnv.executeSql("select * from tab1").collect());
+            assertThat(result.toString()).isEqualTo("[+I[1, 1], +I[1, 2]]");
 
             // test load data overwrite
-            tableEnv.executeSql("insert into tab1 values (2, 1), (2, 2)").await();
+            tableEnv.executeSql("insert overwrite table tab1 values (2, 1), (2, 2)").await();
             tableEnv.executeSql(
                     String.format(
                             "load data local inpath '%s' overwrite into table tab2",
@@ -741,6 +746,8 @@ public class HiveDialectQueryITCase {
                                     "load data inpath '%s' into table p_table partition (dateint=2022) ",
                                     testLoadCsvFilePath))
                     .await();
+            // the file should be removed
+            assertThat(new File(testLoadCsvFilePath).exists()).isFalse();
             result =
                     CollectionUtil.iteratorToList(
                             tableEnv.executeSql("select * from p_table where dateint=2022")