You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/07/09 17:57:24 UTC

[GitHub] [flink] xuefuz commented on a change in pull request #9032: [FLINK-13160]HiveTaleSource should implement PartitionableTableSource

xuefuz commented on a change in pull request #9032: [FLINK-13160]HiveTaleSource should implement PartitionableTableSource
URL: https://github.com/apache/flink/pull/9032#discussion_r301718344
 
 

 ##########
 File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
 ##########
 @@ -49,27 +52,42 @@
 /**
  * A TableSource implementation to read data from Hive tables.
  */
-public class HiveTableSource extends InputFormatTableSource<Row> {
+public class HiveTableSource extends InputFormatTableSource<Row> implements PartitionableTableSource {
 
 	private static Logger logger = LoggerFactory.getLogger(HiveTableSource.class);
 
 	private final JobConf jobConf;
 	private final ObjectPath tablePath;
 	private final CatalogTable catalogTable;
-	private List<HiveTablePartition> allPartitions;
+	private List<HiveTablePartition> allHivePartitions;
 	private String hiveVersion;
+	//partitionList represent all partitions in map list format used in partition-pruning situation.
+	private List<Map<String, String>> partitionList = new ArrayList<>();
+	private Map<Map<String, String>, HiveTablePartition> partitionSpec2HiveTablePartition = new HashMap<>();
 
 	public HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable) {
 		this.jobConf = jobConf;
 		this.tablePath = tablePath;
 		this.catalogTable = catalogTable;
 		this.hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
+		initAllPartitions();
+	}
+
+	private HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable,
+							List<HiveTablePartition> allHivePartitions,
+							String hiveVersion,
+							List<Map<String, String>> partitionList) {
+		this.jobConf = jobConf;
+		this.tablePath = tablePath;
+		this.catalogTable = catalogTable;
+		this.allHivePartitions = allHivePartitions;
+		this.hiveVersion = hiveVersion;
+		this.partitionList = partitionList;
 	}
 
 	@Override
 	public InputFormat getInputFormat() {
-		initAllPartitions();
-		return new HiveTableInputFormat(jobConf, catalogTable, allPartitions);
+		return new HiveTableInputFormat(jobConf, catalogTable, allHivePartitions);
 
 Review comment:
   I'm not sure of the call orders, but wondering if it makes sense to create a HiveTableInputFormat still with all the partitions. Maybe some partitions are pruned out.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services