You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/21 14:01:30 UTC

[GitHub] [flink] leonardBang opened a new pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

leonardBang opened a new pull request #13729:
URL: https://github.com/apache/flink/pull/13729


   ## What is the purpose of the change
   
   * Currently, the hive table only support load all partitions in temporal join, this pull request support read user specific partition 
   
   
   ## Brief change log
   
     - import option `lookup.join.partition` to support read specific partition, the given partition specification string as following examples:
        *   'lookup.join.partition' = 'pt_year=2020;pt_month=09;pt_day=15',
        *   'lookup.join.partition' = 'pt_year=2020',
        *   'lookup.join.partition' = 'pt_year=2020;pt_month=max_partition();pt_day=max_partition'
     - refactor `HiveLookupFunction.java`
   
   
   ## Verifying this change
   
   Add ITCase and unit test to cover this change.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): ( no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010",
       "triggerID" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9029",
       "triggerID" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 38db44425ddf7d79eb28ca6bd36dffdb81d456a0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010) 
   * 13202dd2477bfb3730e2cb039311b1a0784368dd Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9029) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010",
       "triggerID" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9029",
       "triggerID" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8768964c023e7d241203485203ad5216958365a0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9041",
       "triggerID" : "8768964c023e7d241203485203ad5216958365a0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8768964c023e7d241203485203ad5216958365a0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9041) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010",
       "triggerID" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9029",
       "triggerID" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8768964c023e7d241203485203ad5216958365a0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9041",
       "triggerID" : "8768964c023e7d241203485203ad5216958365a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9149",
       "triggerID" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4a41ac978fceda679c3cd3f09a26011ed8d16029",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4a41ac978fceda679c3cd3f09a26011ed8d16029",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aefdb5f32451d2a47b8cd57b46bc559ccde5b797 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9149) 
   * 4a41ac978fceda679c3cd3f09a26011ed8d16029 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
leonardBang commented on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713600454


   CC: @JingsongLi Could you help review ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517753997



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.FileSystemLookupFunction;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionReader;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
+
+/**
+ * Hive Table Source that has lookup ability.
+ *
+ * <p>Hive Table source has both lookup and continuous read ability, when it acts as continuous read source
+ * it does not have the lookup ability but can be a temporal table just like other stream sources.
+ * When it acts as bounded table, it has the lookup ability.
+ *
+ * <p>A common user case is use hive dimension table and always lookup the latest partition data, in this case
+ * hive table source is a continuous read source but currently we implements it by LookupFunction. Because currently
+ * TableSource can not tell the downstream when the latest partition has been read finished. This is a temporarily
+ * workaround and will re-implement in the future.
+ */
+public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
+
+	private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L);
+	private final Configuration configuration;
+	private Duration hiveTableReloadInterval;
+
+	public HiveLookupTableSource(
+			JobConf jobConf,
+			ReadableConfig flinkConf,
+			ObjectPath tablePath,
+			CatalogTable catalogTable) {
+		super(jobConf, flinkConf, tablePath, catalogTable);
+		this.configuration = new Configuration();
+		catalogTable.getOptions().forEach(configuration::setString);
+		validateLookupConfigurations();
+	}
+
+	@Override
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+		return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
+	}
+
+	@VisibleForTesting
+	TableFunction<RowData> getLookupFunction(int[][] keys) {
+		int[] keyIndices = new int[keys.length];
+		int i = 0;
+		for (int[] key : keys) {
+			if (key.length > 1) {
+				throw new UnsupportedOperationException("Hive lookup can not support nested key now.");
+			}
+			keyIndices[i] = key[0];
+			i++;
+		}
+		return getLookupFunction(keyIndices);
+	}
+
+	private void validateLookupConfigurations() {
+		String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
+		if (isStreamingSource()) {
+			Preconditions.checkArgument(
+					!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
+					String.format(
+							"The '%s' is not supported when set '%s' to 'latest'",
+							STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key()));
+
+			Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
+					? DEFAULT_LOOKUP_MONITOR_INTERVAL
+					: configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+			Preconditions.checkArgument(
+					monitorInterval.toMillis() >= DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+					String.format(
+							"Currently the value of '%s' is required bigger or equal to default value '%s' " +
+									"when set '%s' to 'latest', but actual is '%s'",
+							STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+							DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+							monitorInterval.toMillis())
+			);
+
+			hiveTableReloadInterval = monitorInterval;
+		} else {
+			Preconditions.checkArgument(
+					"all".equals(partitionInclude),
+					String.format("The only supported %s for lookup is '%s' in batch source," +
+							" but actual is '%s'", STREAMING_SOURCE_PARTITION_INCLUDE.key(), "all", partitionInclude));
+
+			hiveTableReloadInterval = configuration.get(LOOKUP_JOIN_CACHE_TTL);
+		}
+	}
+
+	private TableFunction<RowData> getLookupFunction(int[] keys) {
+
+		final String defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+				HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+		PartitionFetcher.Context<HiveTablePartition> fetcherContext = new HiveTablePartitionFetcherContext(
+				tablePath,
+				hiveShim,
+				new JobConfWrapper(jobConf),
+				catalogTable.getPartitionKeys(),
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				configuration,
+				defaultPartitionName);
+
+		PartitionFetcher<HiveTablePartition> partitionFetcher;
+		if (catalogTable.getPartitionKeys().isEmpty()) {
+			// non-partitioned table, the fetcher fetches the partition which represents the given table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				context.getNonPartitionedTablePartition().ifPresent(partValueList::add);
+				return partValueList;
+			};
+		} else if (isStreamingSource()) {
+			// streaming-read partitioned table, the fetcher fetches the latest partition of the given table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				List<Tuple2<List<String>, Comparable>> allPartValueToTime = context.getPartValueWithComparableObjList();
+				// fetch latest partitions for partitioned table
+				if (allPartValueToTime.size() > 0) {
+					//sort in desc order
+					allPartValueToTime.sort((o1, o2) -> o2.f1.compareTo(o1.f1));
+					Tuple2<List<String>, Comparable> maxPartition = allPartValueToTime.get(0);
+					context.getPartition(maxPartition.f0).ifPresent(partValueList::add);
+				} else {
+					throw new IllegalArgumentException(
+							String.format("At least one partition is required when set '%s' to 'latest' in temporal join," +
+											" but actual partition number is '%s'",
+									STREAMING_SOURCE_PARTITION_INCLUDE.key(), allPartValueToTime.size()));
+				}
+				return partValueList;
+			};
+		} else {
+			// bounded-read partitioned table, the fetcher fetches all partitions of the given filesystem table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				List<Tuple2<List<String>, Comparable>> allPartValueToTime = context.getPartValueWithComparableObjList();
+				for (Tuple2<List<String>, Comparable> partValueToTime : allPartValueToTime) {
+					context.getPartition(partValueToTime.f0).ifPresent(partValueList::add);
+				}
+				return partValueList;
+			};
+		}
+
+		PartitionReader<HiveTablePartition, RowData> partitionReader = new HiveInputFormatPartitionReader(
+				jobConf,
+				hiveVersion,
+				tablePath,
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				catalogTable.getPartitionKeys(),
+				projectedFields,
+				flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
+
+		return new FileSystemLookupFunction<>(
+				partitionFetcher,
+				fetcherContext,
+				partitionReader,
+				(RowType) getProducedTableSchema().toRowDataType().getLogicalType(),
+				keys,
+				hiveTableReloadInterval);
+	}
+
+	/**
+	 * PartitionFetcher.Context for {@link HiveTablePartition}.
+	 */
+	static class HiveTablePartitionFetcherContext implements PartitionFetcher.Context<HiveTablePartition> {
+
+		private static final long serialVersionUID = 1L;
+		private final ObjectPath tablePath;
+		private final HiveShim hiveShim;
+		private final JobConfWrapper confWrapper;
+		private final List<String> partitionKeys;
+		private final DataType[] fieldTypes;
+		private final String[] fieldNames;
+		private final Configuration configuration;
+		private final String defaultPartitionName;
+
+		private transient IMetaStoreClient metaStoreClient;
+		private transient StorageDescriptor tableSd;
+		private transient Properties tableProps;
+		private transient PartitionTimeExtractor extractor;
+		private transient ConsumeOrder consumeOrder;
+		private transient Path tableLocation;
+		private transient Table table;
+		private transient FileSystem fs;
+
+		public HiveTablePartitionFetcherContext(
+				ObjectPath tablePath,
+				HiveShim hiveShim,
+				JobConfWrapper confWrapper,
+				List<String> partitionKeys,
+				DataType[] fieldTypes,
+				String[] fieldNames,
+				Configuration configuration,
+				String defaultPartitionName) {
+			this.tablePath = tablePath;
+			this.hiveShim = hiveShim;
+			this.confWrapper = confWrapper;
+			this.partitionKeys = partitionKeys;
+			this.fieldTypes = fieldTypes;
+			this.fieldNames = fieldNames;
+			this.configuration = configuration;
+			this.defaultPartitionName = defaultPartitionName;
+		}
+
+		@Override
+		public void open() throws Exception {
+			metaStoreClient = hiveShim.getHiveMetastoreClient(new HiveConf(confWrapper.conf(), HiveConf.class));
+			table = metaStoreClient.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+			tableSd = table.getSd();
+			tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
+
+			String consumeOrderStr = configuration.get(STREAMING_SOURCE_PARTITION_ORDER);
+			consumeOrder = ConsumeOrder.getConsumeOrder(consumeOrderStr);
+			String extractorKind = configuration.get(PARTITION_TIME_EXTRACTOR_KIND);
+			String extractorClass = configuration.get(PARTITION_TIME_EXTRACTOR_CLASS);
+			String extractorPattern = configuration.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+
+			extractor = PartitionTimeExtractor.create(
+					Thread.currentThread().getContextClassLoader(),
+					extractorKind,
+					extractorClass,
+					extractorPattern);
+			tableLocation = new Path(table.getSd().getLocation());
+			fs = tableLocation.getFileSystem(confWrapper.conf());
+		}
+
+		@Override
+		public Optional<HiveTablePartition> getPartition(List<String> partValues) throws Exception {
+			if (partitionKeys.isEmpty()) {
+				return Optional.empty();
+			}
+			try {
+				Partition partition = metaStoreClient.getPartition(
+						tablePath.getDatabaseName(),
+						tablePath.getObjectName(),
+						partValues);
+				HiveTablePartition hiveTablePartition = HivePartitionUtils.toHiveTablePartition(
+						partitionKeys,
+						fieldNames,
+						fieldTypes,
+						hiveShim,
+						tableProps,
+						defaultPartitionName,
+						partition);
+				return Optional.of(hiveTablePartition);
+			} catch (NoSuchObjectException e) {
+				return Optional.empty();
+			}
+		}
+
+		@Override
+		public Optional<HiveTablePartition> getNonPartitionedTablePartition() {

Review comment:
       Remove this useless method

##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.FileSystemLookupFunction;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionReader;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
+
+/**
+ * Hive Table Source that has lookup ability.
+ *
+ * <p>Hive Table source has both lookup and continuous read ability, when it acts as continuous read source
+ * it does not have the lookup ability but can be a temporal table just like other stream sources.
+ * When it acts as bounded table, it has the lookup ability.
+ *
+ * <p>A common user case is use hive dimension table and always lookup the latest partition data, in this case
+ * hive table source is a continuous read source but currently we implements it by LookupFunction. Because currently
+ * TableSource can not tell the downstream when the latest partition has been read finished. This is a temporarily
+ * workaround and will re-implement in the future.
+ */
+public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
+
+	private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L);
+	private final Configuration configuration;
+	private Duration hiveTableReloadInterval;
+
+	public HiveLookupTableSource(
+			JobConf jobConf,
+			ReadableConfig flinkConf,
+			ObjectPath tablePath,
+			CatalogTable catalogTable) {
+		super(jobConf, flinkConf, tablePath, catalogTable);
+		this.configuration = new Configuration();
+		catalogTable.getOptions().forEach(configuration::setString);
+		validateLookupConfigurations();
+	}
+
+	@Override
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+		return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
+	}
+
+	@VisibleForTesting
+	TableFunction<RowData> getLookupFunction(int[][] keys) {
+		int[] keyIndices = new int[keys.length];
+		int i = 0;
+		for (int[] key : keys) {
+			if (key.length > 1) {
+				throw new UnsupportedOperationException("Hive lookup can not support nested key now.");
+			}
+			keyIndices[i] = key[0];
+			i++;
+		}
+		return getLookupFunction(keyIndices);
+	}
+
+	private void validateLookupConfigurations() {
+		String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
+		if (isStreamingSource()) {
+			Preconditions.checkArgument(
+					!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
+					String.format(
+							"The '%s' is not supported when set '%s' to 'latest'",
+							STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key()));
+
+			Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
+					? DEFAULT_LOOKUP_MONITOR_INTERVAL
+					: configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+			Preconditions.checkArgument(
+					monitorInterval.toMillis() >= DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+					String.format(
+							"Currently the value of '%s' is required bigger or equal to default value '%s' " +
+									"when set '%s' to 'latest', but actual is '%s'",
+							STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+							DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+							monitorInterval.toMillis())
+			);
+
+			hiveTableReloadInterval = monitorInterval;
+		} else {
+			Preconditions.checkArgument(
+					"all".equals(partitionInclude),
+					String.format("The only supported %s for lookup is '%s' in batch source," +
+							" but actual is '%s'", STREAMING_SOURCE_PARTITION_INCLUDE.key(), "all", partitionInclude));
+
+			hiveTableReloadInterval = configuration.get(LOOKUP_JOIN_CACHE_TTL);
+		}
+	}
+
+	private TableFunction<RowData> getLookupFunction(int[] keys) {
+
+		final String defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+				HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+		PartitionFetcher.Context<HiveTablePartition> fetcherContext = new HiveTablePartitionFetcherContext(
+				tablePath,
+				hiveShim,
+				new JobConfWrapper(jobConf),
+				catalogTable.getPartitionKeys(),
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				configuration,
+				defaultPartitionName);
+
+		PartitionFetcher<HiveTablePartition> partitionFetcher;
+		if (catalogTable.getPartitionKeys().isEmpty()) {
+			// non-partitioned table, the fetcher fetches the partition which represents the given table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				context.getNonPartitionedTablePartition().ifPresent(partValueList::add);

Review comment:
       `new HiveTablePartition(tableSd, tableProps)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi merged pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #13729:
URL: https://github.com/apache/flink/pull/13729


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9124af4f501b2afeaefa94621a82612846d98783 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958) 
   * 38db44425ddf7d79eb28ca6bd36dffdb81d456a0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r510029011



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveLookupFunction.java
##########
@@ -47,29 +59,46 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.connectors.hive.HiveTableFactory.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.connectors.hive.HiveTableFactory.LOOKUP_JOIN_PARTITION;
+import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.getPartitionByPartitionSpecs;
+import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.getTableProps;
+import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.toHiveTablePartition;
+import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.validateAndParsePartitionSpecs;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * Lookup table function for filesystem connector tables.
+ * Lookup table function for Hive connector tables.
  */
-public class FileSystemLookupFunction<T extends InputSplit> extends TableFunction<RowData> {
+public class HiveLookupFunction<T extends InputSplit> extends TableFunction<RowData> {

Review comment:
       But we can let Filesystem supports this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 659b57b42486d3b79197653fec933ce42766388e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 659b57b42486d3b79197653fec933ce42766388e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
leonardBang commented on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-723000866


   I've address your comments and rebase  @JingsongLi 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517171576



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionReader.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Reader that reads all records from given partitions.
+ *
+ *<P>This reader should only use in non-parallel instance, e.g. : used by lookup function.
+ *
+ * @param <P> The type of partition.
+ * @param <OUT> The type of returned record.
+ */
+@Internal
+public interface PartitionReader<P, OUT> extends Closeable, Serializable {
+
+	/**
+	 * Opens the reader with given partitions.
+	 * @throws IOException
+	 */
+	void open(List<P> partitions) throws IOException;
+
+	/**
+	 * Method used to check the partitions have read finished or not.
+	 *
+	 *<p>When this method is called, the reader it guaranteed to be opened.
+	 *
+	 * @return True if the partitions has read finished.
+	 * @throws IOException
+	 */
+	boolean hasNext() throws IOException;

Review comment:
       Just `@Nullable OUT read()` is OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517186451



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.FileSystemAllPartitionFetcher;
+import org.apache.flink.table.filesystem.FileSystemLatestPartitionFetcher;
+import org.apache.flink.table.filesystem.FileSystemNonPartitionedTableFetcher;
+import org.apache.flink.table.filesystem.FilesystemLookupFunction;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionReader;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
+
+/**
+ * Hive Table Source that has lookup ability.
+ *
+ * <p>Hive Table source has both lookup and continuous read ability, when it acts as continuous read source
+ * it does not have the lookup ability but can be a temporal table just like other stream sources.
+ * When it acts as bounded table, it has the lookup ability.
+ *
+ * <p>A common user case is use hive dimension table and always lookup the latest partition data, in this case
+ * hive table source is a continuous read source but currently we implements it by LookupFunction. Because currently
+ * TableSource can not tell the downstream when the latest partition has been read finished. This is a temporarily
+ * workaround and will re-implement in the future.
+ */
+public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
+
+	private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L);
+	private final Configuration configuration;
+	private Duration hiveTableCacheTTL;
+
+	public HiveLookupTableSource(
+			JobConf jobConf,
+			ReadableConfig flinkConf,
+			ObjectPath tablePath,
+			CatalogTable catalogTable) {
+		super(jobConf, flinkConf, tablePath, catalogTable);
+		this.configuration = new Configuration();
+		catalogTable.getOptions().forEach(configuration::setString);
+		validateLookupConfigurations();
+	}
+
+	@Override
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+		return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
+	}
+
+	@VisibleForTesting
+	TableFunction<RowData> getLookupFunction(int[][] keys) {
+		List<String> keyNames = new ArrayList<>();
+		TableSchema schema = getTableSchema();
+		for (int[] key : keys) {
+			if (key.length > 1) {
+				throw new UnsupportedOperationException("Hive lookup can not support nested key now.");
+			}
+			keyNames.add(schema.getFieldName(key[0]).get());
+		}
+		return getLookupFunction(keyNames.toArray(new String[0]));
+	}
+
+	private void validateLookupConfigurations() {
+		String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
+		if (isStreamingSource()) {
+			Preconditions.checkArgument(
+					!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
+					String.format(
+							"The '%s' is not supported when set '%s' to 'latest'",
+							STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key()));
+
+			Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+			if (monitorInterval.equals(STREAMING_SOURCE_MONITOR_INTERVAL.defaultValue())) {
+				monitorInterval = DEFAULT_LOOKUP_MONITOR_INTERVAL;
+			}
+			Preconditions.checkArgument(
+					monitorInterval.toMillis() >= DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+					String.format(
+							"Currently the value of '%s' is required bigger or equal to default value '%s' " +
+									"when set '%s' to 'latest', but actual is '%s'",
+							STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+							DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+							monitorInterval.toMillis())
+			);
+
+			hiveTableCacheTTL = monitorInterval;
+		} else {
+			Preconditions.checkArgument(
+					"all".equals(partitionInclude),
+					String.format("The only supported %s for lookup is '%s' in batch source," +
+							" but actual is '%s'", STREAMING_SOURCE_PARTITION_INCLUDE.key(), "all", partitionInclude));
+
+			hiveTableCacheTTL = configuration.get(LOOKUP_JOIN_CACHE_TTL);
+		}
+	}
+
+	private TableFunction<RowData> getLookupFunction(String[] keys) {
+
+		final String defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+				HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+		PartitionFetcher.Context context = new HiveTablePartitionFetcherContext(
+				tablePath,
+				hiveShim,
+				new JobConfWrapper(jobConf),
+				catalogTable.getPartitionKeys(),
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				configuration,
+				defaultPartitionName);
+
+		PartitionFetcher<HiveTablePartition> partitionFetcher;
+		if (catalogTable.getPartitionKeys().isEmpty()) {
+			// non-partitioned table
+			partitionFetcher = new FileSystemNonPartitionedTableFetcher(context);
+
+		} else if (isStreamingSource()) {
+			// streaming-read partitioned table
+			partitionFetcher = new FileSystemLatestPartitionFetcher(context);
+
+		} else {
+			// bounded-read partitioned table
+			partitionFetcher = new FileSystemAllPartitionFetcher(context);
+		}
+
+		PartitionReader<HiveTablePartition, RowData> partitionReader = new HiveInputFormatPartitionReader(
+				jobConf,
+				hiveVersion,
+				tablePath,
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				catalogTable.getPartitionKeys(),
+				projectedFields,
+				flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
+
+		return new FilesystemLookupFunction<>(
+				partitionFetcher,
+				partitionReader,
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				keys,
+				hiveTableCacheTTL);
+	}
+
+	/**
+	 * PartitionFetcher.Context for {@link HiveTablePartition}.
+	 */
+	static class HiveTablePartitionFetcherContext implements PartitionFetcher.Context<HiveTablePartition> {
+
+		private static final long serialVersionUID = 1L;
+		private final ObjectPath tablePath;
+		private final HiveShim hiveShim;
+		private final JobConfWrapper confWrapper;
+		private final List<String> partitionKeys;
+		private final DataType[] fieldTypes;
+		private final String[] fieldNames;
+		private final Configuration configuration;
+		private final String defaultPartitionName;
+
+		private transient IMetaStoreClient metaStoreClient;
+		private transient StorageDescriptor tableSd;
+		private transient Properties tableProps;
+		private transient PartitionTimeExtractor extractor;
+		private transient ConsumeOrder consumeOrder;
+		private transient Path tableLocation;
+		private transient Table table;
+		private transient FileSystem fs;
+
+		public HiveTablePartitionFetcherContext(
+				ObjectPath tablePath,
+				HiveShim hiveShim,
+				JobConfWrapper confWrapper,
+				List<String> partitionKeys,
+				DataType[] fieldTypes,
+				String[] fieldNames,
+				Configuration configuration,
+				String defaultPartitionName) {
+			this.tablePath = tablePath;
+			this.hiveShim = hiveShim;
+			this.confWrapper = confWrapper;
+			this.partitionKeys = partitionKeys;
+			this.fieldTypes = fieldTypes;
+			this.fieldNames = fieldNames;
+			this.configuration = configuration;
+			this.defaultPartitionName = defaultPartitionName;
+		}
+
+		@Override
+		public void initialize() throws Exception {
+			metaStoreClient = hiveShim.getHiveMetastoreClient(new HiveConf(confWrapper.conf(), HiveConf.class));
+			table = metaStoreClient.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+			tableSd = table.getSd();
+			tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
+
+			String consumeOrderStr = configuration.get(STREAMING_SOURCE_PARTITION_ORDER);
+			consumeOrder = ConsumeOrder.getConsumeOrder(consumeOrderStr);
+			String extractorKind = configuration.get(PARTITION_TIME_EXTRACTOR_KIND);
+			String extractorClass = configuration.get(PARTITION_TIME_EXTRACTOR_CLASS);
+			String extractorPattern = configuration.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+
+			extractor = PartitionTimeExtractor.create(
+					Thread.currentThread().getContextClassLoader(),
+					extractorKind,
+					extractorClass,
+					extractorPattern);
+			tableLocation = new Path(table.getSd().getLocation());
+			fs = tableLocation.getFileSystem(confWrapper.conf());
+		}
+
+		@Override
+		public Optional<HiveTablePartition> getPartition(List<String> partValues) throws Exception {
+			if (partitionKeys.isEmpty()) {
+				return Optional.empty();
+			}
+			try {
+				Partition partition = metaStoreClient.getPartition(
+						tablePath.getDatabaseName(),
+						tablePath.getObjectName(),
+						partValues);
+				HiveTablePartition hiveTablePartition = HivePartitionUtils.toHiveTablePartition(
+						partitionKeys,
+						fieldNames,
+						fieldTypes,
+						hiveShim,
+						tableProps,
+						defaultPartitionName, partition);
+				return Optional.of(hiveTablePartition);
+			} catch (NoSuchObjectException e) {
+				return Optional.empty();
+			}
+		}
+
+		@Override
+		public Optional<HiveTablePartition> getNonPartitionedTablePartition() {
+			if (partitionKeys.isEmpty()) {
+				return Optional.of(new HiveTablePartition(tableSd, tableProps));
+			}
+			return Optional.empty();
+		}
+
+		@Override
+		public List<Tuple2<List<String>, Long>> getAllPartValueToTimeList() {
+			FileStatus[] statuses = HivePartitionUtils.getFileStatusRecurse(tableLocation, partitionKeys.size(), fs);
+			List<Tuple2<List<String>, Long>> partValueList = new ArrayList<>();
+			for (FileStatus status : statuses) {
+				List<String> partValues = extractPartitionValues(
+						new org.apache.flink.core.fs.Path(status.getPath().toString()));
+				long timestamp = extractTimestamp(
+						partitionKeys,
+						partValues,
+						// to UTC millisecond.
+						() -> TimestampData.fromTimestamp(
+								new Timestamp(status.getModificationTime())).getMillisecond());
+				partValueList.add(new Tuple2<>(partValues, timestamp));
+			}
+
+			return partValueList;
+		}
+
+		@Override
+		public long extractTimestamp(

Review comment:
       extractTimestamp is a inner method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517186336



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.FileSystemAllPartitionFetcher;
+import org.apache.flink.table.filesystem.FileSystemLatestPartitionFetcher;
+import org.apache.flink.table.filesystem.FileSystemNonPartitionedTableFetcher;
+import org.apache.flink.table.filesystem.FilesystemLookupFunction;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionReader;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
+
+/**
+ * Hive Table Source that has lookup ability.
+ *
+ * <p>Hive Table source has both lookup and continuous read ability, when it acts as continuous read source
+ * it does not have the lookup ability but can be a temporal table just like other stream sources.
+ * When it acts as bounded table, it has the lookup ability.
+ *
+ * <p>A common user case is use hive dimension table and always lookup the latest partition data, in this case
+ * hive table source is a continuous read source but currently we implements it by LookupFunction. Because currently
+ * TableSource can not tell the downstream when the latest partition has been read finished. This is a temporarily
+ * workaround and will re-implement in the future.
+ */
+public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
+
+	private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L);
+	private final Configuration configuration;
+	private Duration hiveTableCacheTTL;
+
+	public HiveLookupTableSource(
+			JobConf jobConf,
+			ReadableConfig flinkConf,
+			ObjectPath tablePath,
+			CatalogTable catalogTable) {
+		super(jobConf, flinkConf, tablePath, catalogTable);
+		this.configuration = new Configuration();
+		catalogTable.getOptions().forEach(configuration::setString);
+		validateLookupConfigurations();
+	}
+
+	@Override
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+		return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
+	}
+
+	@VisibleForTesting
+	TableFunction<RowData> getLookupFunction(int[][] keys) {
+		List<String> keyNames = new ArrayList<>();
+		TableSchema schema = getTableSchema();
+		for (int[] key : keys) {
+			if (key.length > 1) {
+				throw new UnsupportedOperationException("Hive lookup can not support nested key now.");
+			}
+			keyNames.add(schema.getFieldName(key[0]).get());
+		}
+		return getLookupFunction(keyNames.toArray(new String[0]));
+	}
+
+	private void validateLookupConfigurations() {
+		String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
+		if (isStreamingSource()) {
+			Preconditions.checkArgument(
+					!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
+					String.format(
+							"The '%s' is not supported when set '%s' to 'latest'",
+							STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key()));
+
+			Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+			if (monitorInterval.equals(STREAMING_SOURCE_MONITOR_INTERVAL.defaultValue())) {
+				monitorInterval = DEFAULT_LOOKUP_MONITOR_INTERVAL;
+			}
+			Preconditions.checkArgument(
+					monitorInterval.toMillis() >= DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+					String.format(
+							"Currently the value of '%s' is required bigger or equal to default value '%s' " +
+									"when set '%s' to 'latest', but actual is '%s'",
+							STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+							DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+							monitorInterval.toMillis())
+			);
+
+			hiveTableCacheTTL = monitorInterval;
+		} else {
+			Preconditions.checkArgument(
+					"all".equals(partitionInclude),
+					String.format("The only supported %s for lookup is '%s' in batch source," +
+							" but actual is '%s'", STREAMING_SOURCE_PARTITION_INCLUDE.key(), "all", partitionInclude));
+
+			hiveTableCacheTTL = configuration.get(LOOKUP_JOIN_CACHE_TTL);
+		}
+	}
+
+	private TableFunction<RowData> getLookupFunction(String[] keys) {
+
+		final String defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+				HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+		PartitionFetcher.Context context = new HiveTablePartitionFetcherContext(
+				tablePath,
+				hiveShim,
+				new JobConfWrapper(jobConf),
+				catalogTable.getPartitionKeys(),
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				configuration,
+				defaultPartitionName);
+
+		PartitionFetcher<HiveTablePartition> partitionFetcher;
+		if (catalogTable.getPartitionKeys().isEmpty()) {
+			// non-partitioned table
+			partitionFetcher = new FileSystemNonPartitionedTableFetcher(context);
+
+		} else if (isStreamingSource()) {
+			// streaming-read partitioned table
+			partitionFetcher = new FileSystemLatestPartitionFetcher(context);
+
+		} else {
+			// bounded-read partitioned table
+			partitionFetcher = new FileSystemAllPartitionFetcher(context);
+		}
+
+		PartitionReader<HiveTablePartition, RowData> partitionReader = new HiveInputFormatPartitionReader(
+				jobConf,
+				hiveVersion,
+				tablePath,
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				catalogTable.getPartitionKeys(),
+				projectedFields,
+				flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
+
+		return new FilesystemLookupFunction<>(
+				partitionFetcher,
+				partitionReader,
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				keys,
+				hiveTableCacheTTL);
+	}
+
+	/**
+	 * PartitionFetcher.Context for {@link HiveTablePartition}.
+	 */
+	static class HiveTablePartitionFetcherContext implements PartitionFetcher.Context<HiveTablePartition> {
+
+		private static final long serialVersionUID = 1L;
+		private final ObjectPath tablePath;
+		private final HiveShim hiveShim;
+		private final JobConfWrapper confWrapper;
+		private final List<String> partitionKeys;
+		private final DataType[] fieldTypes;
+		private final String[] fieldNames;
+		private final Configuration configuration;
+		private final String defaultPartitionName;
+
+		private transient IMetaStoreClient metaStoreClient;
+		private transient StorageDescriptor tableSd;
+		private transient Properties tableProps;
+		private transient PartitionTimeExtractor extractor;
+		private transient ConsumeOrder consumeOrder;
+		private transient Path tableLocation;
+		private transient Table table;
+		private transient FileSystem fs;
+
+		public HiveTablePartitionFetcherContext(
+				ObjectPath tablePath,
+				HiveShim hiveShim,
+				JobConfWrapper confWrapper,
+				List<String> partitionKeys,
+				DataType[] fieldTypes,
+				String[] fieldNames,
+				Configuration configuration,
+				String defaultPartitionName) {
+			this.tablePath = tablePath;
+			this.hiveShim = hiveShim;
+			this.confWrapper = confWrapper;
+			this.partitionKeys = partitionKeys;
+			this.fieldTypes = fieldTypes;
+			this.fieldNames = fieldNames;
+			this.configuration = configuration;
+			this.defaultPartitionName = defaultPartitionName;
+		}
+
+		@Override
+		public void initialize() throws Exception {
+			metaStoreClient = hiveShim.getHiveMetastoreClient(new HiveConf(confWrapper.conf(), HiveConf.class));
+			table = metaStoreClient.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+			tableSd = table.getSd();
+			tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
+
+			String consumeOrderStr = configuration.get(STREAMING_SOURCE_PARTITION_ORDER);
+			consumeOrder = ConsumeOrder.getConsumeOrder(consumeOrderStr);
+			String extractorKind = configuration.get(PARTITION_TIME_EXTRACTOR_KIND);
+			String extractorClass = configuration.get(PARTITION_TIME_EXTRACTOR_CLASS);
+			String extractorPattern = configuration.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+
+			extractor = PartitionTimeExtractor.create(
+					Thread.currentThread().getContextClassLoader(),
+					extractorKind,
+					extractorClass,
+					extractorPattern);
+			tableLocation = new Path(table.getSd().getLocation());
+			fs = tableLocation.getFileSystem(confWrapper.conf());
+		}
+
+		@Override
+		public Optional<HiveTablePartition> getPartition(List<String> partValues) throws Exception {
+			if (partitionKeys.isEmpty()) {
+				return Optional.empty();
+			}
+			try {
+				Partition partition = metaStoreClient.getPartition(
+						tablePath.getDatabaseName(),
+						tablePath.getObjectName(),
+						partValues);
+				HiveTablePartition hiveTablePartition = HivePartitionUtils.toHiveTablePartition(
+						partitionKeys,
+						fieldNames,
+						fieldTypes,
+						hiveShim,
+						tableProps,
+						defaultPartitionName, partition);
+				return Optional.of(hiveTablePartition);
+			} catch (NoSuchObjectException e) {
+				return Optional.empty();
+			}
+		}
+
+		@Override
+		public Optional<HiveTablePartition> getNonPartitionedTablePartition() {
+			if (partitionKeys.isEmpty()) {
+				return Optional.of(new HiveTablePartition(tableSd, tableProps));
+			}
+			return Optional.empty();
+		}
+
+		@Override
+		public List<Tuple2<List<String>, Long>> getAllPartValueToTimeList() {
+			FileStatus[] statuses = HivePartitionUtils.getFileStatusRecurse(tableLocation, partitionKeys.size(), fs);

Review comment:
       I think we should support name comparator.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517172534



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionReader.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Reader that reads all records from given partitions.
+ *
+ *<P>This reader should only use in non-parallel instance, e.g. : used by lookup function.
+ *
+ * @param <P> The type of partition.
+ * @param <OUT> The type of returned record.
+ */
+@Internal
+public interface PartitionReader<P, OUT> extends Closeable, Serializable {
+
+	/**
+	 * Opens the reader with given partitions.
+	 * @throws IOException
+	 */
+	void open(List<P> partitions) throws IOException;
+
+	/**
+	 * Method used to check the partitions have read finished or not.
+	 *
+	 *<p>When this method is called, the reader it guaranteed to be opened.
+	 *
+	 * @return True if the partitions has read finished.
+	 * @throws IOException

Review comment:
       Remove this comment, ide will warn this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010",
       "triggerID" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9029",
       "triggerID" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8768964c023e7d241203485203ad5216958365a0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9041",
       "triggerID" : "8768964c023e7d241203485203ad5216958365a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9149",
       "triggerID" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aefdb5f32451d2a47b8cd57b46bc559ccde5b797 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9149) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r509859082



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveLookupFunction.java
##########
@@ -47,29 +59,46 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.connectors.hive.HiveTableFactory.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.connectors.hive.HiveTableFactory.LOOKUP_JOIN_PARTITION;
+import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.getPartitionByPartitionSpecs;
+import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.getTableProps;
+import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.toHiveTablePartition;
+import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.validateAndParsePartitionSpecs;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * Lookup table function for filesystem connector tables.
+ * Lookup table function for Hive connector tables.
  */
-public class FileSystemLookupFunction<T extends InputSplit> extends TableFunction<RowData> {
+public class HiveLookupFunction<T extends InputSplit> extends TableFunction<RowData> {

Review comment:
       Why do this? Why not Filesystem also have this lookup capability?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010",
       "triggerID" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 38db44425ddf7d79eb28ca6bd36dffdb81d456a0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010",
       "triggerID" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9029",
       "triggerID" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8768964c023e7d241203485203ad5216958365a0",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9041",
       "triggerID" : "8768964c023e7d241203485203ad5216958365a0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 13202dd2477bfb3730e2cb039311b1a0784368dd Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9029) 
   * 8768964c023e7d241203485203ad5216958365a0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9041) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517157699



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -74,24 +74,37 @@
 							" NOTES: Please make sure that each partition/file should be written" +
 							" atomically, otherwise the reader may get incomplete data.");
 
+	public static final ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE =
+			key("streaming-source.partition.include")
+					.stringType()
+					.defaultValue("all")
+					.withDescription("Option to set the partitions to read, the supported values " +
+							"are \"all\" and \"latest\"," +
+							" the \"all\" means read all partitions; the \"latest\" means read latest " +
+							"partition in order of streaming-source.partition.order, the \"latest\" only works" +
+							" when the streaming hive source table used as temporal table. " +
+							"By default the option is \"all\".\n.");
+
 	public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL =
 			key("streaming-source.monitor-interval")
 					.durationType()
 					.defaultValue(Duration.ofMinutes(1))

Review comment:
       I think it is better to remove this default value




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lirui-apache commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
lirui-apache commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r518751788



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
+import org.apache.flink.connectors.hive.read.HivePartitionFetcherContextBase;
+import org.apache.flink.connectors.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.filesystem.FileSystemLookupFunction;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionReader;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+
+/**
+ * Hive Table Source that has lookup ability.
+ *
+ * <p>Hive Table source has both lookup and continuous read ability, when it acts as continuous read source
+ * it does not have the lookup ability but can be a temporal table just like other stream sources.
+ * When it acts as bounded table, it has the lookup ability.
+ *
+ * <p>A common user case is use hive table as dimension table and always lookup the latest partition data, in this
+ * case, hive table source is a continuous read source but currently we implements it by LookupFunction. Because
+ * currently TableSource can not tell the downstream when the latest partition has been read finished. This is a
+ * temporarily workaround and will re-implement in the future.
+ */
+public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
+
+	private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L);
+	private final Configuration configuration;
+	private Duration hiveTableReloadInterval;
+
+	public HiveLookupTableSource(
+			JobConf jobConf,
+			ReadableConfig flinkConf,
+			ObjectPath tablePath,
+			CatalogTable catalogTable) {
+		super(jobConf, flinkConf, tablePath, catalogTable);
+		this.configuration = new Configuration();
+		catalogTable.getOptions().forEach(configuration::setString);
+		validateLookupConfigurations();
+	}
+
+	@Override
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+		return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
+	}
+
+	@VisibleForTesting
+	TableFunction<RowData> getLookupFunction(int[][] keys) {
+		int[] keyIndices = new int[keys.length];
+		int i = 0;
+		for (int[] key : keys) {
+			if (key.length > 1) {
+				throw new UnsupportedOperationException("Hive lookup can not support nested key now.");
+			}
+			keyIndices[i] = key[0];
+			i++;
+		}
+		return getLookupFunction(keyIndices);
+	}
+
+	private void validateLookupConfigurations() {
+		String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
+		if (isStreamingSource()) {
+			Preconditions.checkArgument(
+					!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
+					String.format(
+							"The '%s' is not supported when set '%s' to 'latest'",
+							STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key()));
+
+			Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
+					? DEFAULT_LOOKUP_MONITOR_INTERVAL
+					: configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+			Preconditions.checkArgument(
+					monitorInterval.toMillis() >= DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+					String.format(
+							"Currently the value of '%s' is required bigger or equal to default value '%s' " +
+									"when set '%s' to 'latest', but actual is '%s'",
+							STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+							DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+							monitorInterval.toMillis())
+			);
+
+			hiveTableReloadInterval = monitorInterval;
+		} else {
+			Preconditions.checkArgument(

Review comment:
       Does this mean we require `STREAMING_SOURCE_ENABLE` to be set in order to load latest partition in temporal join? IMHO this limitation is not very friendly because many users still want to be able to use their hive tables in batch analysis. I think we should only require `STREAMING_SOURCE_ENABLE` if users want to consume data in a streaming fashion, i.e. periodically monitor files/partitions and fetch data incrementally.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
leonardBang commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r518800038



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
+import org.apache.flink.connectors.hive.read.HivePartitionFetcherContextBase;
+import org.apache.flink.connectors.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.filesystem.FileSystemLookupFunction;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionReader;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+
+/**
+ * Hive Table Source that has lookup ability.
+ *
+ * <p>Hive Table source has both lookup and continuous read ability, when it acts as continuous read source
+ * it does not have the lookup ability but can be a temporal table just like other stream sources.
+ * When it acts as bounded table, it has the lookup ability.
+ *
+ * <p>A common user case is use hive table as dimension table and always lookup the latest partition data, in this
+ * case, hive table source is a continuous read source but currently we implements it by LookupFunction. Because
+ * currently TableSource can not tell the downstream when the latest partition has been read finished. This is a
+ * temporarily workaround and will re-implement in the future.
+ */
+public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
+
+	private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L);
+	private final Configuration configuration;
+	private Duration hiveTableReloadInterval;
+
+	public HiveLookupTableSource(
+			JobConf jobConf,
+			ReadableConfig flinkConf,
+			ObjectPath tablePath,
+			CatalogTable catalogTable) {
+		super(jobConf, flinkConf, tablePath, catalogTable);
+		this.configuration = new Configuration();
+		catalogTable.getOptions().forEach(configuration::setString);
+		validateLookupConfigurations();
+	}
+
+	@Override
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+		return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
+	}
+
+	@VisibleForTesting
+	TableFunction<RowData> getLookupFunction(int[][] keys) {
+		int[] keyIndices = new int[keys.length];
+		int i = 0;
+		for (int[] key : keys) {
+			if (key.length > 1) {
+				throw new UnsupportedOperationException("Hive lookup can not support nested key now.");
+			}
+			keyIndices[i] = key[0];
+			i++;
+		}
+		return getLookupFunction(keyIndices);
+	}
+
+	private void validateLookupConfigurations() {
+		String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
+		if (isStreamingSource()) {
+			Preconditions.checkArgument(
+					!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
+					String.format(
+							"The '%s' is not supported when set '%s' to 'latest'",
+							STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key()));
+
+			Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
+					? DEFAULT_LOOKUP_MONITOR_INTERVAL
+					: configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+			Preconditions.checkArgument(
+					monitorInterval.toMillis() >= DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+					String.format(
+							"Currently the value of '%s' is required bigger or equal to default value '%s' " +
+									"when set '%s' to 'latest', but actual is '%s'",
+							STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+							DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+							monitorInterval.toMillis())
+			);
+
+			hiveTableReloadInterval = monitorInterval;
+		} else {
+			Preconditions.checkArgument(

Review comment:
       Yes, because we think using partition project is better to load specific partition in batch analysis, and from the semantic, a batch table does not have the latest partition but streaming hive table has. And for batch, user can still use `LOOKUP_JOIN_CACHE_TTL` to set the reload interval.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517158432



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FilesystemLookupFunction.java
##########
@@ -51,125 +45,113 @@
 import java.util.stream.IntStream;
 
 /**
- * Lookup table function for filesystem connector tables.
+ * Lookup function for filesystem connector tables.
+ *
+ * <p>The hive connector and filesystem connector share read/write files code.
+ * Currently, only this function only used in hive connector.
  */
-public class FileSystemLookupFunction<T extends InputSplit> extends TableFunction<RowData> {
-
-	private static final long serialVersionUID = 1L;
+public class FilesystemLookupFunction<P> extends TableFunction<RowData> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(FileSystemLookupFunction.class);
+	private static final Logger LOG = LoggerFactory.getLogger(FilesystemLookupFunction.class);
 
 	// the max number of retries before throwing exception, in case of failure to load the table into cache
 	private static final int MAX_RETRIES = 3;
 	// interval between retries
 	private static final Duration RETRY_INTERVAL = Duration.ofSeconds(10);
 
-	private final InputFormat<RowData, T> inputFormat;
-	// names and types of the records returned by the input format
-	private final String[] producedNames;
-	private final DataType[] producedTypes;
+	private final PartitionFetcher<P> partitionFetcher;
+	private final PartitionReader<P, RowData> partitionReader;
+	private final int[] lookupCols;
+	private final RowData.FieldGetter[] lookupFieldGetters;
 	private final Duration cacheTTL;
+	private final TypeSerializer<RowData> serializer;
+	private final DataType[] fieldTypes;
+	private final String[] fieldNames;
 
-	// indices of lookup columns in the record returned by input format
-	private final int[] lookupCols;
-	// use Row as key for the cache
-	private transient Map<Row, List<RowData>> cache;
+	// cache for lookup data
+	private transient Map<RowData, List<RowData>> cache;
 	// timestamp when cache expires
 	private transient long nextLoadTime;
-	// serializer to copy RowData
-	private transient TypeSerializer<RowData> serializer;
-	// converters to convert data from internal to external in order to generate keys for the cache
-	private final DataFormatConverter[] converters;
 
-	public FileSystemLookupFunction(
-			InputFormat<RowData, T> inputFormat,
+	public FilesystemLookupFunction(
+			PartitionFetcher<P> partitionFetcher,
+			PartitionReader<P, RowData> partitionReader,
+			DataType[] fieldTypes,

Review comment:
       You can just pass a `RowType` here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010",
       "triggerID" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9029",
       "triggerID" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8768964c023e7d241203485203ad5216958365a0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9041",
       "triggerID" : "8768964c023e7d241203485203ad5216958365a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8768964c023e7d241203485203ad5216958365a0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9041) 
   * aefdb5f32451d2a47b8cd57b46bc559ccde5b797 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010",
       "triggerID" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9029",
       "triggerID" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8768964c023e7d241203485203ad5216958365a0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9041",
       "triggerID" : "8768964c023e7d241203485203ad5216958365a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9149",
       "triggerID" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4a41ac978fceda679c3cd3f09a26011ed8d16029",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4a41ac978fceda679c3cd3f09a26011ed8d16029",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4a41ac978fceda679c3cd3f09a26011ed8d16029",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "723377617",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9230",
       "triggerID" : "723377617",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "924b755349e6ef14f84d26a83a9addb4dd9dc6c6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9237",
       "triggerID" : "924b755349e6ef14f84d26a83a9addb4dd9dc6c6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aefdb5f32451d2a47b8cd57b46bc559ccde5b797 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9149) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9230) 
   * 4a41ac978fceda679c3cd3f09a26011ed8d16029 UNKNOWN
   * 924b755349e6ef14f84d26a83a9addb4dd9dc6c6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9237) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517752511



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.FileSystemLookupFunction;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionReader;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
+
+/**
+ * Hive Table Source that has lookup ability.
+ *
+ * <p>Hive Table source has both lookup and continuous read ability, when it acts as continuous read source
+ * it does not have the lookup ability but can be a temporal table just like other stream sources.
+ * When it acts as bounded table, it has the lookup ability.
+ *
+ * <p>A common user case is use hive dimension table and always lookup the latest partition data, in this case
+ * hive table source is a continuous read source but currently we implements it by LookupFunction. Because currently
+ * TableSource can not tell the downstream when the latest partition has been read finished. This is a temporarily
+ * workaround and will re-implement in the future.
+ */
+public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
+
+	private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L);
+	private final Configuration configuration;
+	private Duration hiveTableReloadInterval;
+
+	public HiveLookupTableSource(
+			JobConf jobConf,
+			ReadableConfig flinkConf,
+			ObjectPath tablePath,
+			CatalogTable catalogTable) {
+		super(jobConf, flinkConf, tablePath, catalogTable);
+		this.configuration = new Configuration();
+		catalogTable.getOptions().forEach(configuration::setString);
+		validateLookupConfigurations();
+	}
+
+	@Override
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+		return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
+	}
+
+	@VisibleForTesting
+	TableFunction<RowData> getLookupFunction(int[][] keys) {
+		int[] keyIndices = new int[keys.length];
+		int i = 0;
+		for (int[] key : keys) {
+			if (key.length > 1) {
+				throw new UnsupportedOperationException("Hive lookup can not support nested key now.");
+			}
+			keyIndices[i] = key[0];
+			i++;
+		}
+		return getLookupFunction(keyIndices);
+	}
+
+	private void validateLookupConfigurations() {
+		String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
+		if (isStreamingSource()) {
+			Preconditions.checkArgument(
+					!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
+					String.format(
+							"The '%s' is not supported when set '%s' to 'latest'",
+							STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key()));
+
+			Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
+					? DEFAULT_LOOKUP_MONITOR_INTERVAL
+					: configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+			Preconditions.checkArgument(
+					monitorInterval.toMillis() >= DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+					String.format(
+							"Currently the value of '%s' is required bigger or equal to default value '%s' " +
+									"when set '%s' to 'latest', but actual is '%s'",
+							STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+							DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+							monitorInterval.toMillis())
+			);
+
+			hiveTableReloadInterval = monitorInterval;
+		} else {
+			Preconditions.checkArgument(
+					"all".equals(partitionInclude),
+					String.format("The only supported %s for lookup is '%s' in batch source," +
+							" but actual is '%s'", STREAMING_SOURCE_PARTITION_INCLUDE.key(), "all", partitionInclude));
+
+			hiveTableReloadInterval = configuration.get(LOOKUP_JOIN_CACHE_TTL);
+		}
+	}
+
+	private TableFunction<RowData> getLookupFunction(int[] keys) {
+
+		final String defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+				HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+		PartitionFetcher.Context<HiveTablePartition> fetcherContext = new HiveTablePartitionFetcherContext(
+				tablePath,
+				hiveShim,
+				new JobConfWrapper(jobConf),
+				catalogTable.getPartitionKeys(),
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				configuration,
+				defaultPartitionName);
+
+		PartitionFetcher<HiveTablePartition> partitionFetcher;
+		if (catalogTable.getPartitionKeys().isEmpty()) {
+			// non-partitioned table, the fetcher fetches the partition which represents the given table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				context.getNonPartitionedTablePartition().ifPresent(partValueList::add);
+				return partValueList;
+			};
+		} else if (isStreamingSource()) {
+			// streaming-read partitioned table, the fetcher fetches the latest partition of the given table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				List<Tuple2<List<String>, Comparable>> allPartValueToTime = context.getPartValueWithComparableObjList();
+				// fetch latest partitions for partitioned table
+				if (allPartValueToTime.size() > 0) {
+					//sort in desc order
+					allPartValueToTime.sort((o1, o2) -> o2.f1.compareTo(o1.f1));
+					Tuple2<List<String>, Comparable> maxPartition = allPartValueToTime.get(0);
+					context.getPartition(maxPartition.f0).ifPresent(partValueList::add);
+				} else {
+					throw new IllegalArgumentException(
+							String.format("At least one partition is required when set '%s' to 'latest' in temporal join," +
+											" but actual partition number is '%s'",
+									STREAMING_SOURCE_PARTITION_INCLUDE.key(), allPartValueToTime.size()));
+				}
+				return partValueList;
+			};
+		} else {
+			// bounded-read partitioned table, the fetcher fetches all partitions of the given filesystem table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				List<Tuple2<List<String>, Comparable>> allPartValueToTime = context.getPartValueWithComparableObjList();
+				for (Tuple2<List<String>, Comparable> partValueToTime : allPartValueToTime) {
+					context.getPartition(partValueToTime.f0).ifPresent(partValueList::add);
+				}
+				return partValueList;
+			};
+		}
+
+		PartitionReader<HiveTablePartition, RowData> partitionReader = new HiveInputFormatPartitionReader(
+				jobConf,
+				hiveVersion,
+				tablePath,
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				catalogTable.getPartitionKeys(),
+				projectedFields,
+				flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
+
+		return new FileSystemLookupFunction<>(
+				partitionFetcher,
+				fetcherContext,
+				partitionReader,
+				(RowType) getProducedTableSchema().toRowDataType().getLogicalType(),
+				keys,
+				hiveTableReloadInterval);
+	}
+
+	/**
+	 * PartitionFetcher.Context for {@link HiveTablePartition}.
+	 */
+	static class HiveTablePartitionFetcherContext implements PartitionFetcher.Context<HiveTablePartition> {
+
+		private static final long serialVersionUID = 1L;
+		private final ObjectPath tablePath;
+		private final HiveShim hiveShim;
+		private final JobConfWrapper confWrapper;
+		private final List<String> partitionKeys;
+		private final DataType[] fieldTypes;
+		private final String[] fieldNames;
+		private final Configuration configuration;
+		private final String defaultPartitionName;
+
+		private transient IMetaStoreClient metaStoreClient;
+		private transient StorageDescriptor tableSd;
+		private transient Properties tableProps;
+		private transient PartitionTimeExtractor extractor;
+		private transient ConsumeOrder consumeOrder;
+		private transient Path tableLocation;
+		private transient Table table;
+		private transient FileSystem fs;
+
+		public HiveTablePartitionFetcherContext(
+				ObjectPath tablePath,
+				HiveShim hiveShim,
+				JobConfWrapper confWrapper,
+				List<String> partitionKeys,
+				DataType[] fieldTypes,
+				String[] fieldNames,
+				Configuration configuration,
+				String defaultPartitionName) {
+			this.tablePath = tablePath;
+			this.hiveShim = hiveShim;
+			this.confWrapper = confWrapper;
+			this.partitionKeys = partitionKeys;
+			this.fieldTypes = fieldTypes;
+			this.fieldNames = fieldNames;
+			this.configuration = configuration;
+			this.defaultPartitionName = defaultPartitionName;
+		}
+
+		@Override
+		public void open() throws Exception {
+			metaStoreClient = hiveShim.getHiveMetastoreClient(new HiveConf(confWrapper.conf(), HiveConf.class));
+			table = metaStoreClient.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+			tableSd = table.getSd();
+			tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
+
+			String consumeOrderStr = configuration.get(STREAMING_SOURCE_PARTITION_ORDER);
+			consumeOrder = ConsumeOrder.getConsumeOrder(consumeOrderStr);
+			String extractorKind = configuration.get(PARTITION_TIME_EXTRACTOR_KIND);
+			String extractorClass = configuration.get(PARTITION_TIME_EXTRACTOR_CLASS);
+			String extractorPattern = configuration.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+
+			extractor = PartitionTimeExtractor.create(
+					Thread.currentThread().getContextClassLoader(),
+					extractorKind,
+					extractorClass,
+					extractorPattern);
+			tableLocation = new Path(table.getSd().getLocation());
+			fs = tableLocation.getFileSystem(confWrapper.conf());
+		}
+
+		@Override
+		public Optional<HiveTablePartition> getPartition(List<String> partValues) throws Exception {
+			if (partitionKeys.isEmpty()) {
+				return Optional.empty();
+			}
+			try {
+				Partition partition = metaStoreClient.getPartition(
+						tablePath.getDatabaseName(),
+						tablePath.getObjectName(),
+						partValues);
+				HiveTablePartition hiveTablePartition = HivePartitionUtils.toHiveTablePartition(
+						partitionKeys,
+						fieldNames,
+						fieldTypes,
+						hiveShim,
+						tableProps,
+						defaultPartitionName,
+						partition);
+				return Optional.of(hiveTablePartition);
+			} catch (NoSuchObjectException e) {
+				return Optional.empty();
+			}
+		}
+
+		@Override
+		public Optional<HiveTablePartition> getNonPartitionedTablePartition() {
+			if (partitionKeys.isEmpty()) {
+				return Optional.of(new HiveTablePartition(tableSd, tableProps));
+			}
+			return Optional.empty();
+		}
+
+		@Override
+		public List<Tuple2<List<String>, Comparable>> getPartValueWithComparableObjList() throws Exception {

Review comment:
       You can return a `ComparablePartition` interface in Context.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517753997



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.FileSystemLookupFunction;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionReader;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
+
+/**
+ * Hive Table Source that has lookup ability.
+ *
+ * <p>Hive Table source has both lookup and continuous read ability, when it acts as continuous read source
+ * it does not have the lookup ability but can be a temporal table just like other stream sources.
+ * When it acts as bounded table, it has the lookup ability.
+ *
+ * <p>A common user case is use hive dimension table and always lookup the latest partition data, in this case
+ * hive table source is a continuous read source but currently we implements it by LookupFunction. Because currently
+ * TableSource can not tell the downstream when the latest partition has been read finished. This is a temporarily
+ * workaround and will re-implement in the future.
+ */
+public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
+
+	private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L);
+	private final Configuration configuration;
+	private Duration hiveTableReloadInterval;
+
+	public HiveLookupTableSource(
+			JobConf jobConf,
+			ReadableConfig flinkConf,
+			ObjectPath tablePath,
+			CatalogTable catalogTable) {
+		super(jobConf, flinkConf, tablePath, catalogTable);
+		this.configuration = new Configuration();
+		catalogTable.getOptions().forEach(configuration::setString);
+		validateLookupConfigurations();
+	}
+
+	@Override
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+		return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
+	}
+
+	@VisibleForTesting
+	TableFunction<RowData> getLookupFunction(int[][] keys) {
+		int[] keyIndices = new int[keys.length];
+		int i = 0;
+		for (int[] key : keys) {
+			if (key.length > 1) {
+				throw new UnsupportedOperationException("Hive lookup can not support nested key now.");
+			}
+			keyIndices[i] = key[0];
+			i++;
+		}
+		return getLookupFunction(keyIndices);
+	}
+
+	private void validateLookupConfigurations() {
+		String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
+		if (isStreamingSource()) {
+			Preconditions.checkArgument(
+					!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
+					String.format(
+							"The '%s' is not supported when set '%s' to 'latest'",
+							STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key()));
+
+			Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
+					? DEFAULT_LOOKUP_MONITOR_INTERVAL
+					: configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+			Preconditions.checkArgument(
+					monitorInterval.toMillis() >= DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+					String.format(
+							"Currently the value of '%s' is required bigger or equal to default value '%s' " +
+									"when set '%s' to 'latest', but actual is '%s'",
+							STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+							DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+							monitorInterval.toMillis())
+			);
+
+			hiveTableReloadInterval = monitorInterval;
+		} else {
+			Preconditions.checkArgument(
+					"all".equals(partitionInclude),
+					String.format("The only supported %s for lookup is '%s' in batch source," +
+							" but actual is '%s'", STREAMING_SOURCE_PARTITION_INCLUDE.key(), "all", partitionInclude));
+
+			hiveTableReloadInterval = configuration.get(LOOKUP_JOIN_CACHE_TTL);
+		}
+	}
+
+	private TableFunction<RowData> getLookupFunction(int[] keys) {
+
+		final String defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+				HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+		PartitionFetcher.Context<HiveTablePartition> fetcherContext = new HiveTablePartitionFetcherContext(
+				tablePath,
+				hiveShim,
+				new JobConfWrapper(jobConf),
+				catalogTable.getPartitionKeys(),
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				configuration,
+				defaultPartitionName);
+
+		PartitionFetcher<HiveTablePartition> partitionFetcher;
+		if (catalogTable.getPartitionKeys().isEmpty()) {
+			// non-partitioned table, the fetcher fetches the partition which represents the given table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				context.getNonPartitionedTablePartition().ifPresent(partValueList::add);
+				return partValueList;
+			};
+		} else if (isStreamingSource()) {
+			// streaming-read partitioned table, the fetcher fetches the latest partition of the given table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				List<Tuple2<List<String>, Comparable>> allPartValueToTime = context.getPartValueWithComparableObjList();
+				// fetch latest partitions for partitioned table
+				if (allPartValueToTime.size() > 0) {
+					//sort in desc order
+					allPartValueToTime.sort((o1, o2) -> o2.f1.compareTo(o1.f1));
+					Tuple2<List<String>, Comparable> maxPartition = allPartValueToTime.get(0);
+					context.getPartition(maxPartition.f0).ifPresent(partValueList::add);
+				} else {
+					throw new IllegalArgumentException(
+							String.format("At least one partition is required when set '%s' to 'latest' in temporal join," +
+											" but actual partition number is '%s'",
+									STREAMING_SOURCE_PARTITION_INCLUDE.key(), allPartValueToTime.size()));
+				}
+				return partValueList;
+			};
+		} else {
+			// bounded-read partitioned table, the fetcher fetches all partitions of the given filesystem table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				List<Tuple2<List<String>, Comparable>> allPartValueToTime = context.getPartValueWithComparableObjList();
+				for (Tuple2<List<String>, Comparable> partValueToTime : allPartValueToTime) {
+					context.getPartition(partValueToTime.f0).ifPresent(partValueList::add);
+				}
+				return partValueList;
+			};
+		}
+
+		PartitionReader<HiveTablePartition, RowData> partitionReader = new HiveInputFormatPartitionReader(
+				jobConf,
+				hiveVersion,
+				tablePath,
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				catalogTable.getPartitionKeys(),
+				projectedFields,
+				flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
+
+		return new FileSystemLookupFunction<>(
+				partitionFetcher,
+				fetcherContext,
+				partitionReader,
+				(RowType) getProducedTableSchema().toRowDataType().getLogicalType(),
+				keys,
+				hiveTableReloadInterval);
+	}
+
+	/**
+	 * PartitionFetcher.Context for {@link HiveTablePartition}.
+	 */
+	static class HiveTablePartitionFetcherContext implements PartitionFetcher.Context<HiveTablePartition> {
+
+		private static final long serialVersionUID = 1L;
+		private final ObjectPath tablePath;
+		private final HiveShim hiveShim;
+		private final JobConfWrapper confWrapper;
+		private final List<String> partitionKeys;
+		private final DataType[] fieldTypes;
+		private final String[] fieldNames;
+		private final Configuration configuration;
+		private final String defaultPartitionName;
+
+		private transient IMetaStoreClient metaStoreClient;
+		private transient StorageDescriptor tableSd;
+		private transient Properties tableProps;
+		private transient PartitionTimeExtractor extractor;
+		private transient ConsumeOrder consumeOrder;
+		private transient Path tableLocation;
+		private transient Table table;
+		private transient FileSystem fs;
+
+		public HiveTablePartitionFetcherContext(
+				ObjectPath tablePath,
+				HiveShim hiveShim,
+				JobConfWrapper confWrapper,
+				List<String> partitionKeys,
+				DataType[] fieldTypes,
+				String[] fieldNames,
+				Configuration configuration,
+				String defaultPartitionName) {
+			this.tablePath = tablePath;
+			this.hiveShim = hiveShim;
+			this.confWrapper = confWrapper;
+			this.partitionKeys = partitionKeys;
+			this.fieldTypes = fieldTypes;
+			this.fieldNames = fieldNames;
+			this.configuration = configuration;
+			this.defaultPartitionName = defaultPartitionName;
+		}
+
+		@Override
+		public void open() throws Exception {
+			metaStoreClient = hiveShim.getHiveMetastoreClient(new HiveConf(confWrapper.conf(), HiveConf.class));
+			table = metaStoreClient.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+			tableSd = table.getSd();
+			tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
+
+			String consumeOrderStr = configuration.get(STREAMING_SOURCE_PARTITION_ORDER);
+			consumeOrder = ConsumeOrder.getConsumeOrder(consumeOrderStr);
+			String extractorKind = configuration.get(PARTITION_TIME_EXTRACTOR_KIND);
+			String extractorClass = configuration.get(PARTITION_TIME_EXTRACTOR_CLASS);
+			String extractorPattern = configuration.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+
+			extractor = PartitionTimeExtractor.create(
+					Thread.currentThread().getContextClassLoader(),
+					extractorKind,
+					extractorClass,
+					extractorPattern);
+			tableLocation = new Path(table.getSd().getLocation());
+			fs = tableLocation.getFileSystem(confWrapper.conf());
+		}
+
+		@Override
+		public Optional<HiveTablePartition> getPartition(List<String> partValues) throws Exception {
+			if (partitionKeys.isEmpty()) {
+				return Optional.empty();
+			}
+			try {
+				Partition partition = metaStoreClient.getPartition(
+						tablePath.getDatabaseName(),
+						tablePath.getObjectName(),
+						partValues);
+				HiveTablePartition hiveTablePartition = HivePartitionUtils.toHiveTablePartition(
+						partitionKeys,
+						fieldNames,
+						fieldTypes,
+						hiveShim,
+						tableProps,
+						defaultPartitionName,
+						partition);
+				return Optional.of(hiveTablePartition);
+			} catch (NoSuchObjectException e) {
+				return Optional.empty();
+			}
+		}
+
+		@Override
+		public Optional<HiveTablePartition> getNonPartitionedTablePartition() {

Review comment:
       Remove this useless method, you can merge this to `getPartition `




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517158596



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FilesystemLookupFunction.java
##########
@@ -51,125 +45,113 @@
 import java.util.stream.IntStream;
 
 /**
- * Lookup table function for filesystem connector tables.
+ * Lookup function for filesystem connector tables.
+ *
+ * <p>The hive connector and filesystem connector share read/write files code.
+ * Currently, only this function only used in hive connector.
  */
-public class FileSystemLookupFunction<T extends InputSplit> extends TableFunction<RowData> {
-
-	private static final long serialVersionUID = 1L;
+public class FilesystemLookupFunction<P> extends TableFunction<RowData> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(FileSystemLookupFunction.class);
+	private static final Logger LOG = LoggerFactory.getLogger(FilesystemLookupFunction.class);
 
 	// the max number of retries before throwing exception, in case of failure to load the table into cache
 	private static final int MAX_RETRIES = 3;
 	// interval between retries
 	private static final Duration RETRY_INTERVAL = Duration.ofSeconds(10);
 
-	private final InputFormat<RowData, T> inputFormat;
-	// names and types of the records returned by the input format
-	private final String[] producedNames;
-	private final DataType[] producedTypes;
+	private final PartitionFetcher<P> partitionFetcher;
+	private final PartitionReader<P, RowData> partitionReader;
+	private final int[] lookupCols;
+	private final RowData.FieldGetter[] lookupFieldGetters;
 	private final Duration cacheTTL;
+	private final TypeSerializer<RowData> serializer;
+	private final DataType[] fieldTypes;
+	private final String[] fieldNames;
 
-	// indices of lookup columns in the record returned by input format
-	private final int[] lookupCols;
-	// use Row as key for the cache
-	private transient Map<Row, List<RowData>> cache;
+	// cache for lookup data
+	private transient Map<RowData, List<RowData>> cache;
 	// timestamp when cache expires
 	private transient long nextLoadTime;
-	// serializer to copy RowData
-	private transient TypeSerializer<RowData> serializer;
-	// converters to convert data from internal to external in order to generate keys for the cache
-	private final DataFormatConverter[] converters;
 
-	public FileSystemLookupFunction(
-			InputFormat<RowData, T> inputFormat,
+	public FilesystemLookupFunction(
+			PartitionFetcher<P> partitionFetcher,
+			PartitionReader<P, RowData> partitionReader,
+			DataType[] fieldTypes,
+			String[] fieldNames,
 			String[] lookupKeys,
-			String[] producedNames,
-			DataType[] producedTypes,
 			Duration cacheTTL) {
-		lookupCols = new int[lookupKeys.length];
-		converters = new DataFormatConverter[lookupKeys.length];
-		Map<String, Integer> nameToIndex = IntStream.range(0, producedNames.length).boxed().collect(
-				Collectors.toMap(i -> producedNames[i], i -> i));
+		this.cacheTTL = cacheTTL;
+		this.partitionFetcher = partitionFetcher;
+		this.partitionReader = partitionReader;
+		this.fieldTypes = fieldTypes;
+		this.fieldNames = fieldNames;
+		this.lookupCols = new int[lookupKeys.length];
+		this.lookupFieldGetters = new RowData.FieldGetter[lookupKeys.length];
+		Map<String, Integer> nameToIndex = IntStream.range(0, fieldNames.length).boxed().collect(
+				Collectors.toMap(i -> fieldNames[i], i -> i));
 		for (int i = 0; i < lookupKeys.length; i++) {
 			Integer index = nameToIndex.get(lookupKeys[i]);
 			Preconditions.checkArgument(index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
-			converters[i] = DataFormatConverters.getConverterForDataType(producedTypes[index]);
+			lookupFieldGetters[i] = RowData.createFieldGetter(fieldTypes[index].getLogicalType(), index);
 			lookupCols[i] = index;
 		}
-		this.inputFormat = inputFormat;
-		this.producedNames = producedNames;
-		this.producedTypes = producedTypes;
-		this.cacheTTL = cacheTTL;
-	}
-
-	@Override
-	public TypeInformation<RowData> getResultType() {
-		return InternalTypeInfo.ofFields(
-				Arrays.stream(producedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new),
-				producedNames);
+		this.serializer = getResultType().createSerializer(new ExecutionConfig());

Review comment:
       Use `InternalSerializers.create`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010",
       "triggerID" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9029",
       "triggerID" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8768964c023e7d241203485203ad5216958365a0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9041",
       "triggerID" : "8768964c023e7d241203485203ad5216958365a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9149",
       "triggerID" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4a41ac978fceda679c3cd3f09a26011ed8d16029",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4a41ac978fceda679c3cd3f09a26011ed8d16029",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4a41ac978fceda679c3cd3f09a26011ed8d16029",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "723377617",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9230",
       "triggerID" : "723377617",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * aefdb5f32451d2a47b8cd57b46bc559ccde5b797 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9149) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9230) 
   * 4a41ac978fceda679c3cd3f09a26011ed8d16029 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 659b57b42486d3b79197653fec933ce42766388e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033) 
   * f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bc53ef3b7595acbc03bdaf1e01360cb802ff5d52 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910) 
   * 9124af4f501b2afeaefa94621a82612846d98783 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bc53ef3b7595acbc03bdaf1e01360cb802ff5d52 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517159584



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionFetcher.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * Fetcher to fetch the suitable partitions of a filesystem table.
+ *
+ * @param <P> The type to describe a partition.
+ */
+@Internal
+public interface PartitionFetcher<P> extends Serializable {
+
+	/**
+	 * Open the resources of the fetcher.
+	 */
+	void open() throws Exception;
+
+	/**
+	 * Fetch the suitable partitions, call this method should guarantee the fetcher has opened.
+	 */
+	List<P> fetch() throws Exception;

Review comment:
       I think it is better to keep `Context` here, and we don't need `open` and `close`.
   (All `open` and `close` are the same)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
leonardBang commented on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-723377617


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010",
       "triggerID" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9029",
       "triggerID" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8768964c023e7d241203485203ad5216958365a0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9041",
       "triggerID" : "8768964c023e7d241203485203ad5216958365a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9149",
       "triggerID" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4a41ac978fceda679c3cd3f09a26011ed8d16029",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4a41ac978fceda679c3cd3f09a26011ed8d16029",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4a41ac978fceda679c3cd3f09a26011ed8d16029",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "723377617",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9230",
       "triggerID" : "723377617",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * aefdb5f32451d2a47b8cd57b46bc559ccde5b797 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9149) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9230) 
   * 4a41ac978fceda679c3cd3f09a26011ed8d16029 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010",
       "triggerID" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 38db44425ddf7d79eb28ca6bd36dffdb81d456a0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010) 
   * 13202dd2477bfb3730e2cb039311b1a0784368dd UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517839019



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HivePartitionUtils.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive.util;

Review comment:
       Better to `org.apache.flink.connectors.hive.util`, because this is for read and write instead of for HiveCatalog.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
leonardBang commented on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-723426962


   my azure pipeline passed:
   https://dev.azure.com/xbjtdcq/xbjtdcq/_build/results?buildId=290&view=results


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517749031



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.FileSystemLookupFunction;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionReader;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
+
+/**
+ * Hive Table Source that has lookup ability.
+ *
+ * <p>Hive Table source has both lookup and continuous read ability, when it acts as continuous read source
+ * it does not have the lookup ability but can be a temporal table just like other stream sources.
+ * When it acts as bounded table, it has the lookup ability.
+ *
+ * <p>A common user case is use hive dimension table and always lookup the latest partition data, in this case
+ * hive table source is a continuous read source but currently we implements it by LookupFunction. Because currently
+ * TableSource can not tell the downstream when the latest partition has been read finished. This is a temporarily
+ * workaround and will re-implement in the future.
+ */
+public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
+
+	private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L);
+	private final Configuration configuration;
+	private Duration hiveTableReloadInterval;
+
+	public HiveLookupTableSource(
+			JobConf jobConf,
+			ReadableConfig flinkConf,
+			ObjectPath tablePath,
+			CatalogTable catalogTable) {
+		super(jobConf, flinkConf, tablePath, catalogTable);
+		this.configuration = new Configuration();
+		catalogTable.getOptions().forEach(configuration::setString);
+		validateLookupConfigurations();
+	}
+
+	@Override
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+		return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
+	}
+
+	@VisibleForTesting
+	TableFunction<RowData> getLookupFunction(int[][] keys) {
+		int[] keyIndices = new int[keys.length];
+		int i = 0;
+		for (int[] key : keys) {
+			if (key.length > 1) {
+				throw new UnsupportedOperationException("Hive lookup can not support nested key now.");
+			}
+			keyIndices[i] = key[0];
+			i++;
+		}
+		return getLookupFunction(keyIndices);
+	}
+
+	private void validateLookupConfigurations() {
+		String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
+		if (isStreamingSource()) {
+			Preconditions.checkArgument(
+					!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
+					String.format(
+							"The '%s' is not supported when set '%s' to 'latest'",
+							STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key()));
+
+			Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
+					? DEFAULT_LOOKUP_MONITOR_INTERVAL
+					: configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+			Preconditions.checkArgument(
+					monitorInterval.toMillis() >= DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+					String.format(
+							"Currently the value of '%s' is required bigger or equal to default value '%s' " +
+									"when set '%s' to 'latest', but actual is '%s'",
+							STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+							DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+							monitorInterval.toMillis())
+			);
+
+			hiveTableReloadInterval = monitorInterval;
+		} else {
+			Preconditions.checkArgument(
+					"all".equals(partitionInclude),
+					String.format("The only supported %s for lookup is '%s' in batch source," +
+							" but actual is '%s'", STREAMING_SOURCE_PARTITION_INCLUDE.key(), "all", partitionInclude));
+
+			hiveTableReloadInterval = configuration.get(LOOKUP_JOIN_CACHE_TTL);
+		}
+	}
+
+	private TableFunction<RowData> getLookupFunction(int[] keys) {
+
+		final String defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+				HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+		PartitionFetcher.Context<HiveTablePartition> fetcherContext = new HiveTablePartitionFetcherContext(
+				tablePath,
+				hiveShim,
+				new JobConfWrapper(jobConf),
+				catalogTable.getPartitionKeys(),
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				configuration,
+				defaultPartitionName);
+
+		PartitionFetcher<HiveTablePartition> partitionFetcher;
+		if (catalogTable.getPartitionKeys().isEmpty()) {
+			// non-partitioned table, the fetcher fetches the partition which represents the given table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				context.getNonPartitionedTablePartition().ifPresent(partValueList::add);
+				return partValueList;
+			};
+		} else if (isStreamingSource()) {
+			// streaming-read partitioned table, the fetcher fetches the latest partition of the given table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				List<Tuple2<List<String>, Comparable>> allPartValueToTime = context.getPartValueWithComparableObjList();
+				// fetch latest partitions for partitioned table
+				if (allPartValueToTime.size() > 0) {
+					//sort in desc order
+					allPartValueToTime.sort((o1, o2) -> o2.f1.compareTo(o1.f1));
+					Tuple2<List<String>, Comparable> maxPartition = allPartValueToTime.get(0);
+					context.getPartition(maxPartition.f0).ifPresent(partValueList::add);
+				} else {
+					throw new IllegalArgumentException(
+							String.format("At least one partition is required when set '%s' to 'latest' in temporal join," +
+											" but actual partition number is '%s'",
+									STREAMING_SOURCE_PARTITION_INCLUDE.key(), allPartValueToTime.size()));
+				}
+				return partValueList;
+			};
+		} else {
+			// bounded-read partitioned table, the fetcher fetches all partitions of the given filesystem table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				List<Tuple2<List<String>, Comparable>> allPartValueToTime = context.getPartValueWithComparableObjList();
+				for (Tuple2<List<String>, Comparable> partValueToTime : allPartValueToTime) {
+					context.getPartition(partValueToTime.f0).ifPresent(partValueList::add);
+				}
+				return partValueList;
+			};
+		}
+
+		PartitionReader<HiveTablePartition, RowData> partitionReader = new HiveInputFormatPartitionReader(
+				jobConf,
+				hiveVersion,
+				tablePath,
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				catalogTable.getPartitionKeys(),
+				projectedFields,
+				flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
+
+		return new FileSystemLookupFunction<>(
+				partitionFetcher,
+				fetcherContext,
+				partitionReader,
+				(RowType) getProducedTableSchema().toRowDataType().getLogicalType(),
+				keys,
+				hiveTableReloadInterval);
+	}
+
+	/**
+	 * PartitionFetcher.Context for {@link HiveTablePartition}.
+	 */
+	static class HiveTablePartitionFetcherContext implements PartitionFetcher.Context<HiveTablePartition> {
+
+		private static final long serialVersionUID = 1L;
+		private final ObjectPath tablePath;
+		private final HiveShim hiveShim;
+		private final JobConfWrapper confWrapper;
+		private final List<String> partitionKeys;
+		private final DataType[] fieldTypes;
+		private final String[] fieldNames;
+		private final Configuration configuration;
+		private final String defaultPartitionName;
+
+		private transient IMetaStoreClient metaStoreClient;
+		private transient StorageDescriptor tableSd;
+		private transient Properties tableProps;
+		private transient PartitionTimeExtractor extractor;
+		private transient ConsumeOrder consumeOrder;
+		private transient Path tableLocation;
+		private transient Table table;
+		private transient FileSystem fs;
+
+		public HiveTablePartitionFetcherContext(
+				ObjectPath tablePath,
+				HiveShim hiveShim,
+				JobConfWrapper confWrapper,
+				List<String> partitionKeys,
+				DataType[] fieldTypes,
+				String[] fieldNames,
+				Configuration configuration,
+				String defaultPartitionName) {
+			this.tablePath = tablePath;
+			this.hiveShim = hiveShim;
+			this.confWrapper = confWrapper;
+			this.partitionKeys = partitionKeys;
+			this.fieldTypes = fieldTypes;
+			this.fieldNames = fieldNames;
+			this.configuration = configuration;
+			this.defaultPartitionName = defaultPartitionName;
+		}
+
+		@Override
+		public void open() throws Exception {
+			metaStoreClient = hiveShim.getHiveMetastoreClient(new HiveConf(confWrapper.conf(), HiveConf.class));
+			table = metaStoreClient.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+			tableSd = table.getSd();
+			tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
+
+			String consumeOrderStr = configuration.get(STREAMING_SOURCE_PARTITION_ORDER);
+			consumeOrder = ConsumeOrder.getConsumeOrder(consumeOrderStr);
+			String extractorKind = configuration.get(PARTITION_TIME_EXTRACTOR_KIND);
+			String extractorClass = configuration.get(PARTITION_TIME_EXTRACTOR_CLASS);
+			String extractorPattern = configuration.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+
+			extractor = PartitionTimeExtractor.create(
+					Thread.currentThread().getContextClassLoader(),
+					extractorKind,
+					extractorClass,
+					extractorPattern);
+			tableLocation = new Path(table.getSd().getLocation());
+			fs = tableLocation.getFileSystem(confWrapper.conf());
+		}
+
+		@Override
+		public Optional<HiveTablePartition> getPartition(List<String> partValues) throws Exception {
+			if (partitionKeys.isEmpty()) {
+				return Optional.empty();
+			}
+			try {
+				Partition partition = metaStoreClient.getPartition(
+						tablePath.getDatabaseName(),
+						tablePath.getObjectName(),
+						partValues);
+				HiveTablePartition hiveTablePartition = HivePartitionUtils.toHiveTablePartition(
+						partitionKeys,
+						fieldNames,
+						fieldTypes,
+						hiveShim,
+						tableProps,
+						defaultPartitionName,
+						partition);
+				return Optional.of(hiveTablePartition);
+			} catch (NoSuchObjectException e) {
+				return Optional.empty();
+			}
+		}
+
+		@Override
+		public Optional<HiveTablePartition> getNonPartitionedTablePartition() {
+			if (partitionKeys.isEmpty()) {
+				return Optional.of(new HiveTablePartition(tableSd, tableProps));
+			}
+			return Optional.empty();
+		}
+
+		@Override
+		public List<Tuple2<List<String>, Comparable>> getPartValueWithComparableObjList() throws Exception {
+			List<Tuple2<List<String>, Comparable>> partValueList = new ArrayList<>();
+			switch (consumeOrder) {
+				case PARTITION_NAME_ORDER:
+					List<String> partitionNames = metaStoreClient.listPartitionNames(
+							tablePath.getDatabaseName(),
+							tablePath.getObjectName(),
+							Short.MAX_VALUE);
+					for (String partitionName : partitionNames) {
+						List<String> partValues = extractPartitionValues(new org.apache.flink.core.fs.Path(partitionName));
+						Comparable comparable = partValues.toString();

Review comment:
       Please implement `Comparable`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068) 
   * bc53ef3b7595acbc03bdaf1e01360cb802ff5d52 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517757935



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -74,24 +74,39 @@
 							" NOTES: Please make sure that each partition/file should be written" +
 							" atomically, otherwise the reader may get incomplete data.");
 
+	public static final ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE =
+			key("streaming-source.partition.include")
+					.stringType()
+					.defaultValue("all")
+					.withDescription("Option to set the partitions to read, the supported values " +
+							"are \"all\" and \"latest\"," +
+							" the \"all\" means read all partitions; the \"latest\" means read latest " +
+							"partition in order of streaming-source.partition.order, the \"latest\" only works" +
+							" when the streaming hive source table used as temporal table. " +
+							"By default the option is \"all\".\n.");
+
 	public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL =
 			key("streaming-source.monitor-interval")
 					.durationType()
-					.defaultValue(Duration.ofMinutes(1))
+					.noDefaultValue()
 					.withDescription("Time interval for consecutively monitoring partition/file.");
 
-	public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_ORDER =
-			key("streaming-source.consume-order")
+	public static final ConfigOption<String> STREAMING_SOURCE_PARTITION_ORDER =
+			key("streaming-source.partition-order")
 					.stringType()
 					.defaultValue("create-time")

Review comment:
       Do we need to change this default value to `partition-name`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9124af4f501b2afeaefa94621a82612846d98783 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 659b57b42486d3b79197653fec933ce42766388e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033) 
   * f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on a change in pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
leonardBang commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r509860215



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveLookupFunction.java
##########
@@ -47,29 +59,46 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.connectors.hive.HiveTableFactory.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.connectors.hive.HiveTableFactory.LOOKUP_JOIN_PARTITION;
+import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.getPartitionByPartitionSpecs;
+import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.getTableProps;
+import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.toHiveTablePartition;
+import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.validateAndParsePartitionSpecs;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * Lookup table function for filesystem connector tables.
+ * Lookup table function for Hive connector tables.
  */
-public class FileSystemLookupFunction<T extends InputSplit> extends TableFunction<RowData> {
+public class HiveLookupFunction<T extends InputSplit> extends TableFunction<RowData> {

Review comment:
       The  `FileSystemLookupFunction ` only used in HiveTableSource.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517173050



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FilesystemLookupFunction.java
##########
@@ -187,11 +169,34 @@ private void checkCacheReload() {
 		}
 	}
 
-	private Row extractKey(RowData row) {
-		Row key = new Row(lookupCols.length);
+	private RowData extractLookupKey(RowData row) {
+		GenericRowData key = new GenericRowData(lookupCols.length);
 		for (int i = 0; i < lookupCols.length; i++) {
-			key.setField(i, converters[i].toExternal(row, lookupCols[i]));
+			key.setField(i, lookupFieldGetters[i].getFieldOrNull(row));
 		}
 		return key;
 	}
+
+	@Override
+	public void close() throws Exception {
+		if (this.partitionFetcher != null) {

Review comment:
       `partitionFetcher` never null




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068) 
   * bc53ef3b7595acbc03bdaf1e01360cb802ff5d52 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010",
       "triggerID" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9029",
       "triggerID" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8768964c023e7d241203485203ad5216958365a0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9041",
       "triggerID" : "8768964c023e7d241203485203ad5216958365a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9149",
       "triggerID" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4a41ac978fceda679c3cd3f09a26011ed8d16029",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4a41ac978fceda679c3cd3f09a26011ed8d16029",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4a41ac978fceda679c3cd3f09a26011ed8d16029",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "723377617",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9230",
       "triggerID" : "723377617",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "924b755349e6ef14f84d26a83a9addb4dd9dc6c6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "924b755349e6ef14f84d26a83a9addb4dd9dc6c6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aefdb5f32451d2a47b8cd57b46bc559ccde5b797 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9149) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9230) 
   * 4a41ac978fceda679c3cd3f09a26011ed8d16029 UNKNOWN
   * 924b755349e6ef14f84d26a83a9addb4dd9dc6c6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517750633



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.FileSystemLookupFunction;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionReader;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
+
+/**
+ * Hive Table Source that has lookup ability.
+ *
+ * <p>Hive Table source has both lookup and continuous read ability, when it acts as continuous read source
+ * it does not have the lookup ability but can be a temporal table just like other stream sources.
+ * When it acts as bounded table, it has the lookup ability.
+ *
+ * <p>A common user case is use hive dimension table and always lookup the latest partition data, in this case
+ * hive table source is a continuous read source but currently we implements it by LookupFunction. Because currently
+ * TableSource can not tell the downstream when the latest partition has been read finished. This is a temporarily
+ * workaround and will re-implement in the future.
+ */
+public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
+
+	private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L);
+	private final Configuration configuration;
+	private Duration hiveTableReloadInterval;
+
+	public HiveLookupTableSource(
+			JobConf jobConf,
+			ReadableConfig flinkConf,
+			ObjectPath tablePath,
+			CatalogTable catalogTable) {
+		super(jobConf, flinkConf, tablePath, catalogTable);
+		this.configuration = new Configuration();
+		catalogTable.getOptions().forEach(configuration::setString);
+		validateLookupConfigurations();
+	}
+
+	@Override
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+		return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
+	}
+
+	@VisibleForTesting
+	TableFunction<RowData> getLookupFunction(int[][] keys) {
+		int[] keyIndices = new int[keys.length];
+		int i = 0;
+		for (int[] key : keys) {
+			if (key.length > 1) {
+				throw new UnsupportedOperationException("Hive lookup can not support nested key now.");
+			}
+			keyIndices[i] = key[0];
+			i++;
+		}
+		return getLookupFunction(keyIndices);
+	}
+
+	private void validateLookupConfigurations() {
+		String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
+		if (isStreamingSource()) {
+			Preconditions.checkArgument(
+					!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
+					String.format(
+							"The '%s' is not supported when set '%s' to 'latest'",
+							STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key()));
+
+			Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
+					? DEFAULT_LOOKUP_MONITOR_INTERVAL
+					: configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+			Preconditions.checkArgument(
+					monitorInterval.toMillis() >= DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+					String.format(
+							"Currently the value of '%s' is required bigger or equal to default value '%s' " +
+									"when set '%s' to 'latest', but actual is '%s'",
+							STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+							DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+							monitorInterval.toMillis())
+			);
+
+			hiveTableReloadInterval = monitorInterval;
+		} else {
+			Preconditions.checkArgument(
+					"all".equals(partitionInclude),
+					String.format("The only supported %s for lookup is '%s' in batch source," +
+							" but actual is '%s'", STREAMING_SOURCE_PARTITION_INCLUDE.key(), "all", partitionInclude));
+
+			hiveTableReloadInterval = configuration.get(LOOKUP_JOIN_CACHE_TTL);
+		}
+	}
+
+	private TableFunction<RowData> getLookupFunction(int[] keys) {
+
+		final String defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+				HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+		PartitionFetcher.Context<HiveTablePartition> fetcherContext = new HiveTablePartitionFetcherContext(
+				tablePath,
+				hiveShim,
+				new JobConfWrapper(jobConf),
+				catalogTable.getPartitionKeys(),
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				configuration,
+				defaultPartitionName);
+
+		PartitionFetcher<HiveTablePartition> partitionFetcher;
+		if (catalogTable.getPartitionKeys().isEmpty()) {
+			// non-partitioned table, the fetcher fetches the partition which represents the given table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				context.getNonPartitionedTablePartition().ifPresent(partValueList::add);
+				return partValueList;
+			};
+		} else if (isStreamingSource()) {
+			// streaming-read partitioned table, the fetcher fetches the latest partition of the given table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				List<Tuple2<List<String>, Comparable>> allPartValueToTime = context.getPartValueWithComparableObjList();
+				// fetch latest partitions for partitioned table
+				if (allPartValueToTime.size() > 0) {
+					//sort in desc order
+					allPartValueToTime.sort((o1, o2) -> o2.f1.compareTo(o1.f1));
+					Tuple2<List<String>, Comparable> maxPartition = allPartValueToTime.get(0);
+					context.getPartition(maxPartition.f0).ifPresent(partValueList::add);
+				} else {
+					throw new IllegalArgumentException(
+							String.format("At least one partition is required when set '%s' to 'latest' in temporal join," +
+											" but actual partition number is '%s'",
+									STREAMING_SOURCE_PARTITION_INCLUDE.key(), allPartValueToTime.size()));
+				}
+				return partValueList;
+			};
+		} else {
+			// bounded-read partitioned table, the fetcher fetches all partitions of the given filesystem table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				List<Tuple2<List<String>, Comparable>> allPartValueToTime = context.getPartValueWithComparableObjList();
+				for (Tuple2<List<String>, Comparable> partValueToTime : allPartValueToTime) {
+					context.getPartition(partValueToTime.f0).ifPresent(partValueList::add);
+				}
+				return partValueList;
+			};
+		}
+
+		PartitionReader<HiveTablePartition, RowData> partitionReader = new HiveInputFormatPartitionReader(
+				jobConf,
+				hiveVersion,
+				tablePath,
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				catalogTable.getPartitionKeys(),
+				projectedFields,
+				flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
+
+		return new FileSystemLookupFunction<>(
+				partitionFetcher,
+				fetcherContext,
+				partitionReader,
+				(RowType) getProducedTableSchema().toRowDataType().getLogicalType(),
+				keys,
+				hiveTableReloadInterval);
+	}
+
+	/**
+	 * PartitionFetcher.Context for {@link HiveTablePartition}.
+	 */
+	static class HiveTablePartitionFetcherContext implements PartitionFetcher.Context<HiveTablePartition> {
+
+		private static final long serialVersionUID = 1L;
+		private final ObjectPath tablePath;
+		private final HiveShim hiveShim;
+		private final JobConfWrapper confWrapper;
+		private final List<String> partitionKeys;
+		private final DataType[] fieldTypes;
+		private final String[] fieldNames;
+		private final Configuration configuration;
+		private final String defaultPartitionName;
+
+		private transient IMetaStoreClient metaStoreClient;
+		private transient StorageDescriptor tableSd;
+		private transient Properties tableProps;
+		private transient PartitionTimeExtractor extractor;
+		private transient ConsumeOrder consumeOrder;
+		private transient Path tableLocation;
+		private transient Table table;
+		private transient FileSystem fs;
+
+		public HiveTablePartitionFetcherContext(
+				ObjectPath tablePath,
+				HiveShim hiveShim,
+				JobConfWrapper confWrapper,
+				List<String> partitionKeys,
+				DataType[] fieldTypes,
+				String[] fieldNames,
+				Configuration configuration,
+				String defaultPartitionName) {
+			this.tablePath = tablePath;
+			this.hiveShim = hiveShim;
+			this.confWrapper = confWrapper;
+			this.partitionKeys = partitionKeys;
+			this.fieldTypes = fieldTypes;
+			this.fieldNames = fieldNames;
+			this.configuration = configuration;
+			this.defaultPartitionName = defaultPartitionName;
+		}
+
+		@Override
+		public void open() throws Exception {
+			metaStoreClient = hiveShim.getHiveMetastoreClient(new HiveConf(confWrapper.conf(), HiveConf.class));
+			table = metaStoreClient.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+			tableSd = table.getSd();
+			tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
+
+			String consumeOrderStr = configuration.get(STREAMING_SOURCE_PARTITION_ORDER);
+			consumeOrder = ConsumeOrder.getConsumeOrder(consumeOrderStr);
+			String extractorKind = configuration.get(PARTITION_TIME_EXTRACTOR_KIND);
+			String extractorClass = configuration.get(PARTITION_TIME_EXTRACTOR_CLASS);
+			String extractorPattern = configuration.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+
+			extractor = PartitionTimeExtractor.create(
+					Thread.currentThread().getContextClassLoader(),
+					extractorKind,
+					extractorClass,
+					extractorPattern);
+			tableLocation = new Path(table.getSd().getLocation());
+			fs = tableLocation.getFileSystem(confWrapper.conf());
+		}
+
+		@Override
+		public Optional<HiveTablePartition> getPartition(List<String> partValues) throws Exception {
+			if (partitionKeys.isEmpty()) {
+				return Optional.empty();
+			}
+			try {
+				Partition partition = metaStoreClient.getPartition(
+						tablePath.getDatabaseName(),
+						tablePath.getObjectName(),
+						partValues);
+				HiveTablePartition hiveTablePartition = HivePartitionUtils.toHiveTablePartition(
+						partitionKeys,
+						fieldNames,
+						fieldTypes,
+						hiveShim,
+						tableProps,
+						defaultPartitionName,
+						partition);
+				return Optional.of(hiveTablePartition);
+			} catch (NoSuchObjectException e) {
+				return Optional.empty();
+			}
+		}
+
+		@Override
+		public Optional<HiveTablePartition> getNonPartitionedTablePartition() {
+			if (partitionKeys.isEmpty()) {
+				return Optional.of(new HiveTablePartition(tableSd, tableProps));
+			}
+			return Optional.empty();
+		}
+
+		@Override
+		public List<Tuple2<List<String>, Comparable>> getPartValueWithComparableObjList() throws Exception {
+			List<Tuple2<List<String>, Comparable>> partValueList = new ArrayList<>();
+			switch (consumeOrder) {
+				case PARTITION_NAME_ORDER:
+					List<String> partitionNames = metaStoreClient.listPartitionNames(
+							tablePath.getDatabaseName(),
+							tablePath.getObjectName(),
+							Short.MAX_VALUE);
+					for (String partitionName : partitionNames) {
+						List<String> partValues = extractPartitionValues(new org.apache.flink.core.fs.Path(partitionName));
+						Comparable comparable = partValues.toString();
+						partValueList.add(new Tuple2<>(partValues, comparable));
+					}
+					break;
+				case CREATE_TIME_ORDER:
+					FileStatus[] statuses = HivePartitionUtils.getFileStatusRecurse(tableLocation, partitionKeys.size(), fs);
+					for (FileStatus status : statuses) {
+						List<String> partValues = extractPartitionValues(
+								new org.apache.flink.core.fs.Path(status.getPath().toString()));
+						Comparable comparable = TimestampData.fromTimestamp(new Timestamp(status.getModificationTime()))
+								.getMillisecond();
+						partValueList.add(new Tuple2<>(partValues, comparable));
+					}
+					break;
+				case PARTITION_TIME_ORDER:
+					List<Partition> partitions = metaStoreClient.listPartitions(

Review comment:
       `listPartitionNames`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713602234


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 659b57b42486d3b79197653fec933ce42766388e (Wed Oct 21 14:04:50 UTC 2020)
   
    ✅no warnings
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517750633



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.FileSystemLookupFunction;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionReader;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
+
+/**
+ * Hive Table Source that has lookup ability.
+ *
+ * <p>Hive Table source has both lookup and continuous read ability, when it acts as continuous read source
+ * it does not have the lookup ability but can be a temporal table just like other stream sources.
+ * When it acts as bounded table, it has the lookup ability.
+ *
+ * <p>A common user case is use hive dimension table and always lookup the latest partition data, in this case
+ * hive table source is a continuous read source but currently we implements it by LookupFunction. Because currently
+ * TableSource can not tell the downstream when the latest partition has been read finished. This is a temporarily
+ * workaround and will re-implement in the future.
+ */
+public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
+
+	private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L);
+	private final Configuration configuration;
+	private Duration hiveTableReloadInterval;
+
+	public HiveLookupTableSource(
+			JobConf jobConf,
+			ReadableConfig flinkConf,
+			ObjectPath tablePath,
+			CatalogTable catalogTable) {
+		super(jobConf, flinkConf, tablePath, catalogTable);
+		this.configuration = new Configuration();
+		catalogTable.getOptions().forEach(configuration::setString);
+		validateLookupConfigurations();
+	}
+
+	@Override
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+		return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
+	}
+
+	@VisibleForTesting
+	TableFunction<RowData> getLookupFunction(int[][] keys) {
+		int[] keyIndices = new int[keys.length];
+		int i = 0;
+		for (int[] key : keys) {
+			if (key.length > 1) {
+				throw new UnsupportedOperationException("Hive lookup can not support nested key now.");
+			}
+			keyIndices[i] = key[0];
+			i++;
+		}
+		return getLookupFunction(keyIndices);
+	}
+
+	private void validateLookupConfigurations() {
+		String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
+		if (isStreamingSource()) {
+			Preconditions.checkArgument(
+					!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
+					String.format(
+							"The '%s' is not supported when set '%s' to 'latest'",
+							STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key()));
+
+			Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
+					? DEFAULT_LOOKUP_MONITOR_INTERVAL
+					: configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+			Preconditions.checkArgument(
+					monitorInterval.toMillis() >= DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+					String.format(
+							"Currently the value of '%s' is required bigger or equal to default value '%s' " +
+									"when set '%s' to 'latest', but actual is '%s'",
+							STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+							DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+							monitorInterval.toMillis())
+			);
+
+			hiveTableReloadInterval = monitorInterval;
+		} else {
+			Preconditions.checkArgument(
+					"all".equals(partitionInclude),
+					String.format("The only supported %s for lookup is '%s' in batch source," +
+							" but actual is '%s'", STREAMING_SOURCE_PARTITION_INCLUDE.key(), "all", partitionInclude));
+
+			hiveTableReloadInterval = configuration.get(LOOKUP_JOIN_CACHE_TTL);
+		}
+	}
+
+	private TableFunction<RowData> getLookupFunction(int[] keys) {
+
+		final String defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+				HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+		PartitionFetcher.Context<HiveTablePartition> fetcherContext = new HiveTablePartitionFetcherContext(
+				tablePath,
+				hiveShim,
+				new JobConfWrapper(jobConf),
+				catalogTable.getPartitionKeys(),
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				configuration,
+				defaultPartitionName);
+
+		PartitionFetcher<HiveTablePartition> partitionFetcher;
+		if (catalogTable.getPartitionKeys().isEmpty()) {
+			// non-partitioned table, the fetcher fetches the partition which represents the given table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				context.getNonPartitionedTablePartition().ifPresent(partValueList::add);
+				return partValueList;
+			};
+		} else if (isStreamingSource()) {
+			// streaming-read partitioned table, the fetcher fetches the latest partition of the given table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				List<Tuple2<List<String>, Comparable>> allPartValueToTime = context.getPartValueWithComparableObjList();
+				// fetch latest partitions for partitioned table
+				if (allPartValueToTime.size() > 0) {
+					//sort in desc order
+					allPartValueToTime.sort((o1, o2) -> o2.f1.compareTo(o1.f1));
+					Tuple2<List<String>, Comparable> maxPartition = allPartValueToTime.get(0);
+					context.getPartition(maxPartition.f0).ifPresent(partValueList::add);
+				} else {
+					throw new IllegalArgumentException(
+							String.format("At least one partition is required when set '%s' to 'latest' in temporal join," +
+											" but actual partition number is '%s'",
+									STREAMING_SOURCE_PARTITION_INCLUDE.key(), allPartValueToTime.size()));
+				}
+				return partValueList;
+			};
+		} else {
+			// bounded-read partitioned table, the fetcher fetches all partitions of the given filesystem table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				List<Tuple2<List<String>, Comparable>> allPartValueToTime = context.getPartValueWithComparableObjList();
+				for (Tuple2<List<String>, Comparable> partValueToTime : allPartValueToTime) {
+					context.getPartition(partValueToTime.f0).ifPresent(partValueList::add);
+				}
+				return partValueList;
+			};
+		}
+
+		PartitionReader<HiveTablePartition, RowData> partitionReader = new HiveInputFormatPartitionReader(
+				jobConf,
+				hiveVersion,
+				tablePath,
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				catalogTable.getPartitionKeys(),
+				projectedFields,
+				flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
+
+		return new FileSystemLookupFunction<>(
+				partitionFetcher,
+				fetcherContext,
+				partitionReader,
+				(RowType) getProducedTableSchema().toRowDataType().getLogicalType(),
+				keys,
+				hiveTableReloadInterval);
+	}
+
+	/**
+	 * PartitionFetcher.Context for {@link HiveTablePartition}.
+	 */
+	static class HiveTablePartitionFetcherContext implements PartitionFetcher.Context<HiveTablePartition> {
+
+		private static final long serialVersionUID = 1L;
+		private final ObjectPath tablePath;
+		private final HiveShim hiveShim;
+		private final JobConfWrapper confWrapper;
+		private final List<String> partitionKeys;
+		private final DataType[] fieldTypes;
+		private final String[] fieldNames;
+		private final Configuration configuration;
+		private final String defaultPartitionName;
+
+		private transient IMetaStoreClient metaStoreClient;
+		private transient StorageDescriptor tableSd;
+		private transient Properties tableProps;
+		private transient PartitionTimeExtractor extractor;
+		private transient ConsumeOrder consumeOrder;
+		private transient Path tableLocation;
+		private transient Table table;
+		private transient FileSystem fs;
+
+		public HiveTablePartitionFetcherContext(
+				ObjectPath tablePath,
+				HiveShim hiveShim,
+				JobConfWrapper confWrapper,
+				List<String> partitionKeys,
+				DataType[] fieldTypes,
+				String[] fieldNames,
+				Configuration configuration,
+				String defaultPartitionName) {
+			this.tablePath = tablePath;
+			this.hiveShim = hiveShim;
+			this.confWrapper = confWrapper;
+			this.partitionKeys = partitionKeys;
+			this.fieldTypes = fieldTypes;
+			this.fieldNames = fieldNames;
+			this.configuration = configuration;
+			this.defaultPartitionName = defaultPartitionName;
+		}
+
+		@Override
+		public void open() throws Exception {
+			metaStoreClient = hiveShim.getHiveMetastoreClient(new HiveConf(confWrapper.conf(), HiveConf.class));
+			table = metaStoreClient.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+			tableSd = table.getSd();
+			tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
+
+			String consumeOrderStr = configuration.get(STREAMING_SOURCE_PARTITION_ORDER);
+			consumeOrder = ConsumeOrder.getConsumeOrder(consumeOrderStr);
+			String extractorKind = configuration.get(PARTITION_TIME_EXTRACTOR_KIND);
+			String extractorClass = configuration.get(PARTITION_TIME_EXTRACTOR_CLASS);
+			String extractorPattern = configuration.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+
+			extractor = PartitionTimeExtractor.create(
+					Thread.currentThread().getContextClassLoader(),
+					extractorKind,
+					extractorClass,
+					extractorPattern);
+			tableLocation = new Path(table.getSd().getLocation());
+			fs = tableLocation.getFileSystem(confWrapper.conf());
+		}
+
+		@Override
+		public Optional<HiveTablePartition> getPartition(List<String> partValues) throws Exception {
+			if (partitionKeys.isEmpty()) {
+				return Optional.empty();
+			}
+			try {
+				Partition partition = metaStoreClient.getPartition(
+						tablePath.getDatabaseName(),
+						tablePath.getObjectName(),
+						partValues);
+				HiveTablePartition hiveTablePartition = HivePartitionUtils.toHiveTablePartition(
+						partitionKeys,
+						fieldNames,
+						fieldTypes,
+						hiveShim,
+						tableProps,
+						defaultPartitionName,
+						partition);
+				return Optional.of(hiveTablePartition);
+			} catch (NoSuchObjectException e) {
+				return Optional.empty();
+			}
+		}
+
+		@Override
+		public Optional<HiveTablePartition> getNonPartitionedTablePartition() {
+			if (partitionKeys.isEmpty()) {
+				return Optional.of(new HiveTablePartition(tableSd, tableProps));
+			}
+			return Optional.empty();
+		}
+
+		@Override
+		public List<Tuple2<List<String>, Comparable>> getPartValueWithComparableObjList() throws Exception {
+			List<Tuple2<List<String>, Comparable>> partValueList = new ArrayList<>();
+			switch (consumeOrder) {
+				case PARTITION_NAME_ORDER:
+					List<String> partitionNames = metaStoreClient.listPartitionNames(
+							tablePath.getDatabaseName(),
+							tablePath.getObjectName(),
+							Short.MAX_VALUE);
+					for (String partitionName : partitionNames) {
+						List<String> partValues = extractPartitionValues(new org.apache.flink.core.fs.Path(partitionName));
+						Comparable comparable = partValues.toString();
+						partValueList.add(new Tuple2<>(partValues, comparable));
+					}
+					break;
+				case CREATE_TIME_ORDER:
+					FileStatus[] statuses = HivePartitionUtils.getFileStatusRecurse(tableLocation, partitionKeys.size(), fs);
+					for (FileStatus status : statuses) {
+						List<String> partValues = extractPartitionValues(
+								new org.apache.flink.core.fs.Path(status.getPath().toString()));
+						Comparable comparable = TimestampData.fromTimestamp(new Timestamp(status.getModificationTime()))
+								.getMillisecond();
+						partValueList.add(new Tuple2<>(partValues, comparable));
+					}
+					break;
+				case PARTITION_TIME_ORDER:
+					List<Partition> partitions = metaStoreClient.listPartitions(

Review comment:
       Why not `listPartitionNames`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010",
       "triggerID" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9029",
       "triggerID" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8768964c023e7d241203485203ad5216958365a0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8768964c023e7d241203485203ad5216958365a0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 38db44425ddf7d79eb28ca6bd36dffdb81d456a0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010) 
   * 13202dd2477bfb3730e2cb039311b1a0784368dd Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9029) 
   * 8768964c023e7d241203485203ad5216958365a0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517157414



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FilesystemLookupFunction.java
##########
@@ -51,125 +45,113 @@
 import java.util.stream.IntStream;
 
 /**
- * Lookup table function for filesystem connector tables.
+ * Lookup function for filesystem connector tables.
+ *
+ * <p>The hive connector and filesystem connector share read/write files code.
+ * Currently, only this function only used in hive connector.
  */
-public class FileSystemLookupFunction<T extends InputSplit> extends TableFunction<RowData> {
-
-	private static final long serialVersionUID = 1L;
+public class FilesystemLookupFunction<P> extends TableFunction<RowData> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(FileSystemLookupFunction.class);
+	private static final Logger LOG = LoggerFactory.getLogger(FilesystemLookupFunction.class);
 
 	// the max number of retries before throwing exception, in case of failure to load the table into cache
 	private static final int MAX_RETRIES = 3;
 	// interval between retries
 	private static final Duration RETRY_INTERVAL = Duration.ofSeconds(10);
 
-	private final InputFormat<RowData, T> inputFormat;
-	// names and types of the records returned by the input format
-	private final String[] producedNames;
-	private final DataType[] producedTypes;
+	private final PartitionFetcher<P> partitionFetcher;
+	private final PartitionReader<P, RowData> partitionReader;
+	private final int[] lookupCols;
+	private final RowData.FieldGetter[] lookupFieldGetters;
 	private final Duration cacheTTL;
+	private final TypeSerializer<RowData> serializer;
+	private final DataType[] fieldTypes;
+	private final String[] fieldNames;
 
-	// indices of lookup columns in the record returned by input format
-	private final int[] lookupCols;
-	// use Row as key for the cache
-	private transient Map<Row, List<RowData>> cache;
+	// cache for lookup data
+	private transient Map<RowData, List<RowData>> cache;
 	// timestamp when cache expires
 	private transient long nextLoadTime;
-	// serializer to copy RowData
-	private transient TypeSerializer<RowData> serializer;
-	// converters to convert data from internal to external in order to generate keys for the cache
-	private final DataFormatConverter[] converters;
 
-	public FileSystemLookupFunction(
-			InputFormat<RowData, T> inputFormat,
+	public FilesystemLookupFunction(
+			PartitionFetcher<P> partitionFetcher,
+			PartitionReader<P, RowData> partitionReader,
+			DataType[] fieldTypes,
+			String[] fieldNames,
 			String[] lookupKeys,
-			String[] producedNames,
-			DataType[] producedTypes,
 			Duration cacheTTL) {

Review comment:
       I think it is better to change to `reloadInterval`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lirui-apache commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
lirui-apache commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r519093000



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
+import org.apache.flink.connectors.hive.read.HivePartitionFetcherContextBase;
+import org.apache.flink.connectors.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.filesystem.FileSystemLookupFunction;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionReader;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+
+/**
+ * Hive Table Source that has lookup ability.
+ *
+ * <p>Hive Table source has both lookup and continuous read ability, when it acts as continuous read source
+ * it does not have the lookup ability but can be a temporal table just like other stream sources.
+ * When it acts as bounded table, it has the lookup ability.
+ *
+ * <p>A common user case is use hive table as dimension table and always lookup the latest partition data, in this
+ * case, hive table source is a continuous read source but currently we implements it by LookupFunction. Because
+ * currently TableSource can not tell the downstream when the latest partition has been read finished. This is a
+ * temporarily workaround and will re-implement in the future.
+ */
+public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
+
+	private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L);
+	private final Configuration configuration;
+	private Duration hiveTableReloadInterval;
+
+	public HiveLookupTableSource(
+			JobConf jobConf,
+			ReadableConfig flinkConf,
+			ObjectPath tablePath,
+			CatalogTable catalogTable) {
+		super(jobConf, flinkConf, tablePath, catalogTable);
+		this.configuration = new Configuration();
+		catalogTable.getOptions().forEach(configuration::setString);
+		validateLookupConfigurations();
+	}
+
+	@Override
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+		return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
+	}
+
+	@VisibleForTesting
+	TableFunction<RowData> getLookupFunction(int[][] keys) {
+		int[] keyIndices = new int[keys.length];
+		int i = 0;
+		for (int[] key : keys) {
+			if (key.length > 1) {
+				throw new UnsupportedOperationException("Hive lookup can not support nested key now.");
+			}
+			keyIndices[i] = key[0];
+			i++;
+		}
+		return getLookupFunction(keyIndices);
+	}
+
+	private void validateLookupConfigurations() {
+		String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
+		if (isStreamingSource()) {
+			Preconditions.checkArgument(
+					!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
+					String.format(
+							"The '%s' is not supported when set '%s' to 'latest'",
+							STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key()));
+
+			Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
+					? DEFAULT_LOOKUP_MONITOR_INTERVAL
+					: configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+			Preconditions.checkArgument(
+					monitorInterval.toMillis() >= DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+					String.format(
+							"Currently the value of '%s' is required bigger or equal to default value '%s' " +
+									"when set '%s' to 'latest', but actual is '%s'",
+							STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+							DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+							monitorInterval.toMillis())
+			);
+
+			hiveTableReloadInterval = monitorInterval;
+		} else {
+			Preconditions.checkArgument(

Review comment:
       Problem is, when `STREAMING_SOURCE_ENABLE` is set, the table can no longer be used in a batch job. And given that not many users are using the real "streaming read", it basically means such tables can only be used in temporal join. I'm fine if this is by design, but still feel it's unfriendly and unnecessary.
   
   IMO, any table can have "latest partition" as long as the partitions are comparable in some way. Whether the data should be read in batch or streaming mode is an orthogonal concept.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517153362



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FilesystemLookupFunction.java
##########
@@ -51,125 +45,113 @@
 import java.util.stream.IntStream;
 
 /**
- * Lookup table function for filesystem connector tables.
+ * Lookup function for filesystem connector tables.
+ *
+ * <p>The hive connector and filesystem connector share read/write files code.
+ * Currently, only this function only used in hive connector.
  */
-public class FileSystemLookupFunction<T extends InputSplit> extends TableFunction<RowData> {

Review comment:
       Revert name changing.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FilesystemLookupFunction.java
##########
@@ -51,125 +45,113 @@
 import java.util.stream.IntStream;
 
 /**
- * Lookup table function for filesystem connector tables.
+ * Lookup function for filesystem connector tables.
+ *
+ * <p>The hive connector and filesystem connector share read/write files code.
+ * Currently, only this function only used in hive connector.
  */
-public class FileSystemLookupFunction<T extends InputSplit> extends TableFunction<RowData> {
-
-	private static final long serialVersionUID = 1L;
+public class FilesystemLookupFunction<P> extends TableFunction<RowData> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(FileSystemLookupFunction.class);
+	private static final Logger LOG = LoggerFactory.getLogger(FilesystemLookupFunction.class);
 
 	// the max number of retries before throwing exception, in case of failure to load the table into cache
 	private static final int MAX_RETRIES = 3;
 	// interval between retries
 	private static final Duration RETRY_INTERVAL = Duration.ofSeconds(10);
 
-	private final InputFormat<RowData, T> inputFormat;
-	// names and types of the records returned by the input format
-	private final String[] producedNames;
-	private final DataType[] producedTypes;
+	private final PartitionFetcher<P> partitionFetcher;
+	private final PartitionReader<P, RowData> partitionReader;
+	private final int[] lookupCols;

Review comment:
       lookupCols is never be used




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517762594



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.FileSystemLookupFunction;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionReader;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
+
+/**
+ * Hive Table Source that has lookup ability.
+ *
+ * <p>Hive Table source has both lookup and continuous read ability, when it acts as continuous read source
+ * it does not have the lookup ability but can be a temporal table just like other stream sources.
+ * When it acts as bounded table, it has the lookup ability.
+ *
+ * <p>A common user case is use hive dimension table and always lookup the latest partition data, in this case
+ * hive table source is a continuous read source but currently we implements it by LookupFunction. Because currently
+ * TableSource can not tell the downstream when the latest partition has been read finished. This is a temporarily
+ * workaround and will re-implement in the future.
+ */
+public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
+
+	private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L);
+	private final Configuration configuration;
+	private Duration hiveTableReloadInterval;
+
+	public HiveLookupTableSource(
+			JobConf jobConf,
+			ReadableConfig flinkConf,
+			ObjectPath tablePath,
+			CatalogTable catalogTable) {
+		super(jobConf, flinkConf, tablePath, catalogTable);
+		this.configuration = new Configuration();
+		catalogTable.getOptions().forEach(configuration::setString);
+		validateLookupConfigurations();
+	}
+
+	@Override
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+		return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
+	}
+
+	@VisibleForTesting
+	TableFunction<RowData> getLookupFunction(int[][] keys) {
+		int[] keyIndices = new int[keys.length];
+		int i = 0;
+		for (int[] key : keys) {
+			if (key.length > 1) {
+				throw new UnsupportedOperationException("Hive lookup can not support nested key now.");
+			}
+			keyIndices[i] = key[0];
+			i++;
+		}
+		return getLookupFunction(keyIndices);
+	}
+
+	private void validateLookupConfigurations() {
+		String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
+		if (isStreamingSource()) {
+			Preconditions.checkArgument(
+					!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
+					String.format(
+							"The '%s' is not supported when set '%s' to 'latest'",
+							STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key()));
+
+			Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
+					? DEFAULT_LOOKUP_MONITOR_INTERVAL
+					: configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+			Preconditions.checkArgument(
+					monitorInterval.toMillis() >= DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+					String.format(
+							"Currently the value of '%s' is required bigger or equal to default value '%s' " +
+									"when set '%s' to 'latest', but actual is '%s'",
+							STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+							DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+							monitorInterval.toMillis())
+			);
+
+			hiveTableReloadInterval = monitorInterval;
+		} else {
+			Preconditions.checkArgument(
+					"all".equals(partitionInclude),
+					String.format("The only supported %s for lookup is '%s' in batch source," +
+							" but actual is '%s'", STREAMING_SOURCE_PARTITION_INCLUDE.key(), "all", partitionInclude));
+
+			hiveTableReloadInterval = configuration.get(LOOKUP_JOIN_CACHE_TTL);
+		}
+	}
+
+	private TableFunction<RowData> getLookupFunction(int[] keys) {
+
+		final String defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+				HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+		PartitionFetcher.Context<HiveTablePartition> fetcherContext = new HiveTablePartitionFetcherContext(
+				tablePath,
+				hiveShim,
+				new JobConfWrapper(jobConf),
+				catalogTable.getPartitionKeys(),
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				configuration,
+				defaultPartitionName);
+
+		PartitionFetcher<HiveTablePartition> partitionFetcher;
+		if (catalogTable.getPartitionKeys().isEmpty()) {
+			// non-partitioned table, the fetcher fetches the partition which represents the given table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				context.getNonPartitionedTablePartition().ifPresent(partValueList::add);

Review comment:
       Absent should throw exception

##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.FileSystemLookupFunction;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionReader;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
+
+/**
+ * Hive Table Source that has lookup ability.
+ *
+ * <p>Hive Table source has both lookup and continuous read ability, when it acts as continuous read source
+ * it does not have the lookup ability but can be a temporal table just like other stream sources.
+ * When it acts as bounded table, it has the lookup ability.
+ *
+ * <p>A common user case is use hive dimension table and always lookup the latest partition data, in this case
+ * hive table source is a continuous read source but currently we implements it by LookupFunction. Because currently
+ * TableSource can not tell the downstream when the latest partition has been read finished. This is a temporarily
+ * workaround and will re-implement in the future.
+ */
+public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
+
+	private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L);
+	private final Configuration configuration;
+	private Duration hiveTableReloadInterval;
+
+	public HiveLookupTableSource(
+			JobConf jobConf,
+			ReadableConfig flinkConf,
+			ObjectPath tablePath,
+			CatalogTable catalogTable) {
+		super(jobConf, flinkConf, tablePath, catalogTable);
+		this.configuration = new Configuration();
+		catalogTable.getOptions().forEach(configuration::setString);
+		validateLookupConfigurations();
+	}
+
+	@Override
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+		return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
+	}
+
+	@VisibleForTesting
+	TableFunction<RowData> getLookupFunction(int[][] keys) {
+		int[] keyIndices = new int[keys.length];
+		int i = 0;
+		for (int[] key : keys) {
+			if (key.length > 1) {
+				throw new UnsupportedOperationException("Hive lookup can not support nested key now.");
+			}
+			keyIndices[i] = key[0];
+			i++;
+		}
+		return getLookupFunction(keyIndices);
+	}
+
+	private void validateLookupConfigurations() {
+		String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
+		if (isStreamingSource()) {
+			Preconditions.checkArgument(
+					!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
+					String.format(
+							"The '%s' is not supported when set '%s' to 'latest'",
+							STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key()));
+
+			Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
+					? DEFAULT_LOOKUP_MONITOR_INTERVAL
+					: configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+			Preconditions.checkArgument(
+					monitorInterval.toMillis() >= DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+					String.format(
+							"Currently the value of '%s' is required bigger or equal to default value '%s' " +
+									"when set '%s' to 'latest', but actual is '%s'",
+							STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+							DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+							STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+							monitorInterval.toMillis())
+			);
+
+			hiveTableReloadInterval = monitorInterval;
+		} else {
+			Preconditions.checkArgument(
+					"all".equals(partitionInclude),
+					String.format("The only supported %s for lookup is '%s' in batch source," +
+							" but actual is '%s'", STREAMING_SOURCE_PARTITION_INCLUDE.key(), "all", partitionInclude));
+
+			hiveTableReloadInterval = configuration.get(LOOKUP_JOIN_CACHE_TTL);
+		}
+	}
+
+	private TableFunction<RowData> getLookupFunction(int[] keys) {
+
+		final String defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+				HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+		PartitionFetcher.Context<HiveTablePartition> fetcherContext = new HiveTablePartitionFetcherContext(
+				tablePath,
+				hiveShim,
+				new JobConfWrapper(jobConf),
+				catalogTable.getPartitionKeys(),
+				getProducedTableSchema().getFieldDataTypes(),
+				getProducedTableSchema().getFieldNames(),
+				configuration,
+				defaultPartitionName);
+
+		PartitionFetcher<HiveTablePartition> partitionFetcher;
+		if (catalogTable.getPartitionKeys().isEmpty()) {
+			// non-partitioned table, the fetcher fetches the partition which represents the given table.
+			partitionFetcher = context -> {
+				List<HiveTablePartition> partValueList = new ArrayList<>();
+				context.getNonPartitionedTablePartition().ifPresent(partValueList::add);

Review comment:
       `new HiveTablePartition(tableSd, tableProps)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9010",
       "triggerID" : "38db44425ddf7d79eb28ca6bd36dffdb81d456a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9029",
       "triggerID" : "13202dd2477bfb3730e2cb039311b1a0784368dd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8768964c023e7d241203485203ad5216958365a0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9041",
       "triggerID" : "8768964c023e7d241203485203ad5216958365a0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9149",
       "triggerID" : "aefdb5f32451d2a47b8cd57b46bc559ccde5b797",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8768964c023e7d241203485203ad5216958365a0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9041) 
   * aefdb5f32451d2a47b8cd57b46bc559ccde5b797 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9149) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r510028759



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveLookupFunction.java
##########
@@ -134,9 +188,48 @@ public void eval(Object... values) {
 		}
 	}
 
-	@VisibleForTesting
-	public Duration getCacheTTL() {
-		return cacheTTL;
+	private HiveTableInputFormat getHiveTableInputFormat() {

Review comment:
       Can we add a `copy(Partitions)` method to `HiveTableInputFormat`? Then we don't need move so many logicals.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "659b57b42486d3b79197653fec933ce42766388e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033",
       "triggerID" : "659b57b42486d3b79197653fec933ce42766388e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068",
       "triggerID" : "f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910",
       "triggerID" : "bc53ef3b7595acbc03bdaf1e01360cb802ff5d52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9124af4f501b2afeaefa94621a82612846d98783",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958",
       "triggerID" : "9124af4f501b2afeaefa94621a82612846d98783",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bc53ef3b7595acbc03bdaf1e01360cb802ff5d52 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8910) 
   * 9124af4f501b2afeaefa94621a82612846d98783 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8958) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read latest partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517155968



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FilesystemLookupFunction.java
##########
@@ -51,125 +45,113 @@
 import java.util.stream.IntStream;
 
 /**
- * Lookup table function for filesystem connector tables.
+ * Lookup function for filesystem connector tables.
+ *
+ * <p>The hive connector and filesystem connector share read/write files code.
+ * Currently, only this function only used in hive connector.
  */
-public class FileSystemLookupFunction<T extends InputSplit> extends TableFunction<RowData> {
-
-	private static final long serialVersionUID = 1L;
+public class FilesystemLookupFunction<P> extends TableFunction<RowData> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(FileSystemLookupFunction.class);
+	private static final Logger LOG = LoggerFactory.getLogger(FilesystemLookupFunction.class);
 
 	// the max number of retries before throwing exception, in case of failure to load the table into cache
 	private static final int MAX_RETRIES = 3;
 	// interval between retries
 	private static final Duration RETRY_INTERVAL = Duration.ofSeconds(10);
 
-	private final InputFormat<RowData, T> inputFormat;
-	// names and types of the records returned by the input format
-	private final String[] producedNames;
-	private final DataType[] producedTypes;
+	private final PartitionFetcher<P> partitionFetcher;
+	private final PartitionReader<P, RowData> partitionReader;
+	private final int[] lookupCols;
+	private final RowData.FieldGetter[] lookupFieldGetters;
 	private final Duration cacheTTL;
+	private final TypeSerializer<RowData> serializer;
+	private final DataType[] fieldTypes;
+	private final String[] fieldNames;
 
-	// indices of lookup columns in the record returned by input format
-	private final int[] lookupCols;
-	// use Row as key for the cache
-	private transient Map<Row, List<RowData>> cache;
+	// cache for lookup data
+	private transient Map<RowData, List<RowData>> cache;
 	// timestamp when cache expires
 	private transient long nextLoadTime;
-	// serializer to copy RowData
-	private transient TypeSerializer<RowData> serializer;
-	// converters to convert data from internal to external in order to generate keys for the cache
-	private final DataFormatConverter[] converters;
 
-	public FileSystemLookupFunction(
-			InputFormat<RowData, T> inputFormat,
+	public FilesystemLookupFunction(
+			PartitionFetcher<P> partitionFetcher,
+			PartitionReader<P, RowData> partitionReader,
+			DataType[] fieldTypes,
+			String[] fieldNames,
 			String[] lookupKeys,

Review comment:
       Can you use `int[] lookupKeys`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r510029692



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveLookupFunction.java
##########
@@ -47,29 +59,46 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.connectors.hive.HiveTableFactory.LOOKUP_JOIN_CACHE_TTL;
+import static org.apache.flink.connectors.hive.HiveTableFactory.LOOKUP_JOIN_PARTITION;
+import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.getPartitionByPartitionSpecs;
+import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.getTableProps;
+import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.toHiveTablePartition;
+import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.validateAndParsePartitionSpecs;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * Lookup table function for filesystem connector tables.
+ * Lookup table function for Hive connector tables.
  */
-public class FileSystemLookupFunction<T extends InputSplit> extends TableFunction<RowData> {
+public class HiveLookupFunction<T extends InputSplit> extends TableFunction<RowData> {

Review comment:
       I think we should a better code design, to make the components more decoupled. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org