You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/10 20:42:31 UTC

[flink] branch master updated: [FLINK-13160]HiveTaleSource should implment PartitionableTableSource

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b2e260e  [FLINK-13160]HiveTaleSource should implment PartitionableTableSource
b2e260e is described below

commit b2e260e3eb24c72bd397029f1b7fe73f7c0e8aae
Author: zjuwangg <zj...@foxmail.com>
AuthorDate: Tue Jul 9 16:38:17 2019 +0800

    [FLINK-13160]HiveTaleSource should implment PartitionableTableSource
    
    This PR implements PartitionableTableSource for HiveTaleSource to support partition pruning.
    
    This closes #9032.
---
 .../batch/connectors/hive/HiveTableSource.java     | 78 +++++++++++++++++++---
 1 file changed, 68 insertions(+), 10 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
index 19e46de..31b896f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
@@ -28,9 +28,12 @@ import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.sources.InputFormatTableSource;
+import org.apache.flink.table.sources.PartitionableTableSource;
+import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -49,27 +52,47 @@ import java.util.Map;
 /**
  * A TableSource implementation to read data from Hive tables.
  */
-public class HiveTableSource extends InputFormatTableSource<Row> {
+public class HiveTableSource extends InputFormatTableSource<Row> implements PartitionableTableSource {
 
 	private static Logger logger = LoggerFactory.getLogger(HiveTableSource.class);
 
 	private final JobConf jobConf;
 	private final ObjectPath tablePath;
 	private final CatalogTable catalogTable;
-	private List<HiveTablePartition> allPartitions;
+	private List<HiveTablePartition> allHivePartitions;
 	private String hiveVersion;
+	//partitionList represent all partitions in map list format used in partition-pruning situation.
+	private List<Map<String, String>> partitionList = new ArrayList<>();
+	private Map<Map<String, String>, HiveTablePartition> partitionSpec2HiveTablePartition = new HashMap<>();
+	private boolean initAllPartitions;
 
 	public HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable) {
-		this.jobConf = jobConf;
-		this.tablePath = tablePath;
-		this.catalogTable = catalogTable;
+		this.jobConf = Preconditions.checkNotNull(jobConf);
+		this.tablePath = Preconditions.checkNotNull(tablePath);
+		this.catalogTable = Preconditions.checkNotNull(catalogTable);
 		this.hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
+		initAllPartitions = false;
+	}
+
+	private HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable,
+							List<HiveTablePartition> allHivePartitions,
+							String hiveVersion,
+							List<Map<String, String>> partitionList) {
+		this.jobConf = Preconditions.checkNotNull(jobConf);
+		this.tablePath = Preconditions.checkNotNull(tablePath);
+		this.catalogTable = Preconditions.checkNotNull(catalogTable);
+		this.allHivePartitions = allHivePartitions;
+		this.hiveVersion = hiveVersion;
+		this.partitionList = partitionList;
+		this.initAllPartitions = true;
 	}
 
 	@Override
 	public InputFormat getInputFormat() {
-		initAllPartitions();
-		return new HiveTableInputFormat(jobConf, catalogTable, allPartitions);
+		if (!initAllPartitions) {
+			initAllPartitions();
+		}
+		return new HiveTableInputFormat(jobConf, catalogTable, allHivePartitions);
 	}
 
 	@Override
@@ -87,8 +110,37 @@ public class HiveTableSource extends InputFormatTableSource<Row> {
 		return DataTypes.ROW(fields);
 	}
 
+	@Override
+	public List<Map<String, String>> getPartitions() {
+		if (!initAllPartitions) {
+			initAllPartitions();
+		}
+		return partitionList;
+	}
+
+	@Override
+	public List<String> getPartitionFieldNames() {
+		return catalogTable.getPartitionKeys();
+	}
+
+	@Override
+	public TableSource applyPartitionPruning(List<Map<String, String>> remainingPartitions) {
+		if (catalogTable.getPartitionKeys() == null || catalogTable.getPartitionKeys().size() == 0) {
+			return this;
+		} else {
+			List<HiveTablePartition> remainingHivePartitions = new ArrayList<>();
+			for (Map<String, String> partitionSpec : remainingPartitions) {
+				HiveTablePartition hiveTablePartition = partitionSpec2HiveTablePartition.get(partitionSpec);
+				Preconditions.checkNotNull(hiveTablePartition, String.format("remainingPartitions must contain " +
+																			"partition spec %s", partitionSpec));
+				remainingHivePartitions.add(hiveTablePartition);
+			}
+			return new HiveTableSource(jobConf, tablePath, catalogTable, remainingHivePartitions, hiveVersion, partitionList);
+		}
+	}
+
 	private void initAllPartitions() {
-		allPartitions = new ArrayList<>();
+		allHivePartitions = new ArrayList<>();
 		// Please note that the following directly accesses Hive metastore, which is only a temporary workaround.
 		// Ideally, we need to go thru Catalog API to get all info we need here, which requires some major
 		// refactoring. We will postpone this until we merge Blink to Flink.
@@ -102,21 +154,27 @@ public class HiveTableSource extends InputFormatTableSource<Row> {
 				for (Partition partition : partitions) {
 					StorageDescriptor sd = partition.getSd();
 					Map<String, Object> partitionColValues = new HashMap<>();
+					Map<String, String> partitionSpec = new HashMap<>();
 					for (int i = 0; i < partitionColNames.size(); i++) {
 						String partitionColName = partitionColNames.get(i);
 						String partitionValue = partition.getValues().get(i);
+						partitionSpec.put(partitionColName, partitionValue);
 						DataType type = catalogTable.getSchema().getFieldDataType(partitionColName).get();
 						Object partitionObject = restorePartitionValueFromFromType(partitionValue, type);
 						partitionColValues.put(partitionColName, partitionObject);
 					}
-					allPartitions.add(new HiveTablePartition(sd, partitionColValues));
+					HiveTablePartition hiveTablePartition = new HiveTablePartition(sd, partitionColValues);
+					allHivePartitions.add(hiveTablePartition);
+					partitionList.add(partitionSpec);
+					partitionSpec2HiveTablePartition.put(partitionSpec, hiveTablePartition);
 				}
 			} else {
-				allPartitions.add(new HiveTablePartition(client.getTable(dbName, tableName).getSd(), null));
+				allHivePartitions.add(new HiveTablePartition(client.getTable(dbName, tableName).getSd(), null));
 			}
 		} catch (TException e) {
 			throw new FlinkHiveException("Failed to collect all partitions from hive metaStore", e);
 		}
+		initAllPartitions = true;
 	}
 
 	private Object restorePartitionValueFromFromType(String valStr, DataType type) {