You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/10 20:42:31 UTC
[flink] branch master updated: [FLINK-13160]HiveTaleSource should
implment PartitionableTableSource
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b2e260e [FLINK-13160]HiveTaleSource should implment PartitionableTableSource
b2e260e is described below
commit b2e260e3eb24c72bd397029f1b7fe73f7c0e8aae
Author: zjuwangg <zj...@foxmail.com>
AuthorDate: Tue Jul 9 16:38:17 2019 +0800
[FLINK-13160]HiveTaleSource should implment PartitionableTableSource
This PR implements PartitionableTableSource for HiveTaleSource to support partition pruning.
This closes #9032.
---
.../batch/connectors/hive/HiveTableSource.java | 78 +++++++++++++++++++---
1 file changed, 68 insertions(+), 10 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
index 19e46de..31b896f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
@@ -28,9 +28,12 @@ import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.sources.InputFormatTableSource;
+import org.apache.flink.table.sources.PartitionableTableSource;
+import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
@@ -49,27 +52,47 @@ import java.util.Map;
/**
* A TableSource implementation to read data from Hive tables.
*/
-public class HiveTableSource extends InputFormatTableSource<Row> {
+public class HiveTableSource extends InputFormatTableSource<Row> implements PartitionableTableSource {
private static Logger logger = LoggerFactory.getLogger(HiveTableSource.class);
private final JobConf jobConf;
private final ObjectPath tablePath;
private final CatalogTable catalogTable;
- private List<HiveTablePartition> allPartitions;
+ private List<HiveTablePartition> allHivePartitions;
private String hiveVersion;
+ //partitionList represent all partitions in map list format used in partition-pruning situation.
+ private List<Map<String, String>> partitionList = new ArrayList<>();
+ private Map<Map<String, String>, HiveTablePartition> partitionSpec2HiveTablePartition = new HashMap<>();
+ private boolean initAllPartitions;
public HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable) {
- this.jobConf = jobConf;
- this.tablePath = tablePath;
- this.catalogTable = catalogTable;
+ this.jobConf = Preconditions.checkNotNull(jobConf);
+ this.tablePath = Preconditions.checkNotNull(tablePath);
+ this.catalogTable = Preconditions.checkNotNull(catalogTable);
this.hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
+ initAllPartitions = false;
+ }
+
+ private HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable,
+ List<HiveTablePartition> allHivePartitions,
+ String hiveVersion,
+ List<Map<String, String>> partitionList) {
+ this.jobConf = Preconditions.checkNotNull(jobConf);
+ this.tablePath = Preconditions.checkNotNull(tablePath);
+ this.catalogTable = Preconditions.checkNotNull(catalogTable);
+ this.allHivePartitions = allHivePartitions;
+ this.hiveVersion = hiveVersion;
+ this.partitionList = partitionList;
+ this.initAllPartitions = true;
}
@Override
public InputFormat getInputFormat() {
- initAllPartitions();
- return new HiveTableInputFormat(jobConf, catalogTable, allPartitions);
+ if (!initAllPartitions) {
+ initAllPartitions();
+ }
+ return new HiveTableInputFormat(jobConf, catalogTable, allHivePartitions);
}
@Override
@@ -87,8 +110,37 @@ public class HiveTableSource extends InputFormatTableSource<Row> {
return DataTypes.ROW(fields);
}
+ @Override
+ public List<Map<String, String>> getPartitions() {
+ if (!initAllPartitions) {
+ initAllPartitions();
+ }
+ return partitionList;
+ }
+
+ @Override
+ public List<String> getPartitionFieldNames() {
+ return catalogTable.getPartitionKeys();
+ }
+
+ @Override
+ public TableSource applyPartitionPruning(List<Map<String, String>> remainingPartitions) {
+ if (catalogTable.getPartitionKeys() == null || catalogTable.getPartitionKeys().size() == 0) {
+ return this;
+ } else {
+ List<HiveTablePartition> remainingHivePartitions = new ArrayList<>();
+ for (Map<String, String> partitionSpec : remainingPartitions) {
+ HiveTablePartition hiveTablePartition = partitionSpec2HiveTablePartition.get(partitionSpec);
+ Preconditions.checkNotNull(hiveTablePartition, String.format("remainingPartitions must contain " +
+ "partition spec %s", partitionSpec));
+ remainingHivePartitions.add(hiveTablePartition);
+ }
+ return new HiveTableSource(jobConf, tablePath, catalogTable, remainingHivePartitions, hiveVersion, partitionList);
+ }
+ }
+
private void initAllPartitions() {
- allPartitions = new ArrayList<>();
+ allHivePartitions = new ArrayList<>();
// Please note that the following directly accesses Hive metastore, which is only a temporary workaround.
// Ideally, we need to go thru Catalog API to get all info we need here, which requires some major
// refactoring. We will postpone this until we merge Blink to Flink.
@@ -102,21 +154,27 @@ public class HiveTableSource extends InputFormatTableSource<Row> {
for (Partition partition : partitions) {
StorageDescriptor sd = partition.getSd();
Map<String, Object> partitionColValues = new HashMap<>();
+ Map<String, String> partitionSpec = new HashMap<>();
for (int i = 0; i < partitionColNames.size(); i++) {
String partitionColName = partitionColNames.get(i);
String partitionValue = partition.getValues().get(i);
+ partitionSpec.put(partitionColName, partitionValue);
DataType type = catalogTable.getSchema().getFieldDataType(partitionColName).get();
Object partitionObject = restorePartitionValueFromFromType(partitionValue, type);
partitionColValues.put(partitionColName, partitionObject);
}
- allPartitions.add(new HiveTablePartition(sd, partitionColValues));
+ HiveTablePartition hiveTablePartition = new HiveTablePartition(sd, partitionColValues);
+ allHivePartitions.add(hiveTablePartition);
+ partitionList.add(partitionSpec);
+ partitionSpec2HiveTablePartition.put(partitionSpec, hiveTablePartition);
}
} else {
- allPartitions.add(new HiveTablePartition(client.getTable(dbName, tableName).getSd(), null));
+ allHivePartitions.add(new HiveTablePartition(client.getTable(dbName, tableName).getSd(), null));
}
} catch (TException e) {
throw new FlinkHiveException("Failed to collect all partitions from hive metaStore", e);
}
+ initAllPartitions = true;
}
private Object restorePartitionValueFromFromType(String valStr, DataType type) {