You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/09/21 01:46:50 UTC
[flink] 03/04: [FLINK-29222][hive] Fix wrong behavior for Hive's load data inpath
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bff0985aef4ed43681e6ad3bd81fc460bef3c6a5
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Tue Sep 20 22:30:08 2022 +0800
[FLINK-29222][hive] Fix wrong behavior for Hive's load data inpath
This closes #20778
---
.../planner/delegation/hive/HiveOperationExecutor.java | 8 ++++----
.../flink/connectors/hive/HiveDialectQueryITCase.java | 13 ++++++++++---
2 files changed, 14 insertions(+), 7 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
index 2010062fe4c..597584cc3fd 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
@@ -153,14 +153,14 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor {
hiveLoadDataOperation.getPath(),
hiveLoadDataOperation.getTablePath(),
hiveLoadDataOperation.getPartitionSpec(),
- hiveLoadDataOperation.isSrcLocal(),
- hiveLoadDataOperation.isOverwrite());
+ hiveLoadDataOperation.isOverwrite(),
+ hiveLoadDataOperation.isSrcLocal());
} else {
hiveCatalog.loadTable(
hiveLoadDataOperation.getPath(),
hiveLoadDataOperation.getTablePath(),
- hiveLoadDataOperation.isSrcLocal(),
- hiveLoadDataOperation.isOverwrite());
+ hiveLoadDataOperation.isOverwrite(),
+ hiveLoadDataOperation.isSrcLocal());
}
return Optional.of(TableResultImpl.TABLE_RESULT_OK);
} finally {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
index d305e34ef2d..a2fc20bb1e0 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
@@ -715,17 +715,22 @@ public class HiveDialectQueryITCase {
.replace("$filepath", testLoadCsvFilePath));
// test load data into table
- tableEnv.executeSql("insert into tab1 values (1, 1), (1, 2), (2, 1), (2, 2)").await();
+ tableEnv.executeSql("insert into tab1 values (1, 1), (1, 2)").await();
tableEnv.executeSql(
String.format(
"load data local inpath '%s' INTO TABLE tab2", warehouse + "/tab1"));
List<Row> result =
CollectionUtil.iteratorToList(
tableEnv.executeSql("select * from tab2").collect());
- assertThat(result.toString()).isEqualTo("[+I[1, 1], +I[1, 2], +I[2, 1], +I[2, 2]]");
+ assertThat(result.toString()).isEqualTo("[+I[1, 1], +I[1, 2]]");
+ // there should still exist data in tab1
+ result =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select * from tab1").collect());
+ assertThat(result.toString()).isEqualTo("[+I[1, 1], +I[1, 2]]");
// test load data overwrite
- tableEnv.executeSql("insert into tab1 values (2, 1), (2, 2)").await();
+ tableEnv.executeSql("insert overwrite table tab1 values (2, 1), (2, 2)").await();
tableEnv.executeSql(
String.format(
"load data local inpath '%s' overwrite into table tab2",
@@ -741,6 +746,8 @@ public class HiveDialectQueryITCase {
"load data inpath '%s' into table p_table partition (dateint=2022) ",
testLoadCsvFilePath))
.await();
+ // the file should be removed
+ assertThat(new File(testLoadCsvFilePath).exists()).isFalse();
result =
CollectionUtil.iteratorToList(
tableEnv.executeSql("select * from p_table where dateint=2022")