You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/25 02:24:07 UTC
[flink] branch release-1.11 updated: [FLINK-16197][hive] Failed to
query partitioned table when partition folder is removed
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new bdffe91 [FLINK-16197][hive] Failed to query partitioned table when partition folder is removed
bdffe91 is described below
commit bdffe91c6ae81a2cdb6033d241946c8d09b96c7c
Author: Rui Li <li...@apache.org>
AuthorDate: Mon May 25 10:22:37 2020 +0800
[FLINK-16197][hive] Failed to query partitioned table when partition folder is removed
This closes #11175
---
.../connectors/hive/read/HiveTableInputFormat.java | 11 +++++++++++
.../connectors/hive/TableEnvHiveConnectorTest.java | 21 +++++++++++++++++++++
2 files changed, 32 insertions(+)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
index 0674e50..ee9fc75 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
@@ -34,6 +34,8 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -286,8 +288,17 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<RowData, H
JobConf jobConf) throws IOException {
List<HiveTableInputSplit> hiveSplits = new ArrayList<>();
int splitNum = 0;
+ FileSystem fs = null;
for (HiveTablePartition partition : partitions) {
StorageDescriptor sd = partition.getStorageDescriptor();
+ Path inputPath = new Path(sd.getLocation());
+ if (fs == null) {
+ fs = inputPath.getFileSystem(jobConf);
+ }
+ // it's possible a partition exists in metastore but the data has been removed
+ if (!fs.exists(inputPath)) {
+ continue;
+ }
InputFormat format;
try {
format = (InputFormat)
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 7f6a78d..fa31c8b 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -643,6 +643,27 @@ public class TableEnvHiveConnectorTest {
}
}
+ @Test
+ public void testNonExistingPartitionFolder() throws Exception {
+ hiveShell.execute("create database db1");
+ try {
+ hiveShell.execute("create table db1.part (x int) partitioned by (p int)");
+ HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part").addRow(new Object[]{1}).commit("p=1");
+ HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part").addRow(new Object[]{2}).commit("p=2");
+ hiveShell.execute("alter table db1.part add partition (p=3)");
+ // remove one partition
+ Path toRemove = new Path(hiveCatalog.getHiveTable(new ObjectPath("db1", "part")).getSd().getLocation(), "p=2");
+ FileSystem fs = toRemove.getFileSystem(hiveShell.getHiveConf());
+ fs.delete(toRemove, true);
+
+ TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+ List<Row> results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.part").execute().collect());
+ assertEquals("[1,1]", results.toString());
+ } finally {
+ hiveShell.execute("drop database db1 cascade");
+ }
+ }
+
private TableEnvironment getTableEnvWithHiveCatalog() {
TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);