You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/25 02:24:07 UTC

[flink] branch release-1.11 updated: [FLINK-16197][hive] Failed to query partitioned table when partition folder is removed

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new bdffe91  [FLINK-16197][hive] Failed to query partitioned table when partition folder is removed
bdffe91 is described below

commit bdffe91c6ae81a2cdb6033d241946c8d09b96c7c
Author: Rui Li <li...@apache.org>
AuthorDate: Mon May 25 10:22:37 2020 +0800

    [FLINK-16197][hive] Failed to query partitioned table when partition folder is removed
    
    
    This closes #11175
---
 .../connectors/hive/read/HiveTableInputFormat.java  | 11 +++++++++++
 .../connectors/hive/TableEnvHiveConnectorTest.java  | 21 +++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
index 0674e50..ee9fc75 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
@@ -34,6 +34,8 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -286,8 +288,17 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<RowData, H
 			JobConf jobConf) throws IOException {
 		List<HiveTableInputSplit> hiveSplits = new ArrayList<>();
 		int splitNum = 0;
+		FileSystem fs = null;
 		for (HiveTablePartition partition : partitions) {
 			StorageDescriptor sd = partition.getStorageDescriptor();
+			Path inputPath = new Path(sd.getLocation());
+			if (fs == null) {
+				fs = inputPath.getFileSystem(jobConf);
+			}
+			// it's possible a partition exists in metastore but the data has been removed
+			if (!fs.exists(inputPath)) {
+				continue;
+			}
 			InputFormat format;
 			try {
 				format = (InputFormat)
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 7f6a78d..fa31c8b 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -643,6 +643,27 @@ public class TableEnvHiveConnectorTest {
 		}
 	}
 
+	@Test
+	public void testNonExistingPartitionFolder() throws Exception {
+		hiveShell.execute("create database db1");
+		try {
+			hiveShell.execute("create table db1.part (x int) partitioned by (p int)");
+			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part").addRow(new Object[]{1}).commit("p=1");
+			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part").addRow(new Object[]{2}).commit("p=2");
+			hiveShell.execute("alter table db1.part add partition (p=3)");
+			// remove one partition
+			Path toRemove = new Path(hiveCatalog.getHiveTable(new ObjectPath("db1", "part")).getSd().getLocation(), "p=2");
+			FileSystem fs = toRemove.getFileSystem(hiveShell.getHiveConf());
+			fs.delete(toRemove, true);
+
+			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+			List<Row> results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.part").execute().collect());
+			assertEquals("[1,1]", results.toString());
+		} finally {
+			hiveShell.execute("drop database db1 cascade");
+		}
+	}
+
 	private TableEnvironment getTableEnvWithHiveCatalog() {
 		TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
 		tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);