You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/05 07:29:26 UTC

[GitHub] [flink] lirui-apache opened a new pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

lirui-apache opened a new pull request #11990:
URL: https://github.com/apache/flink/pull/11990


   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](https://flink.apache.org/contributing/contribute-code.html#open-a-pull-request).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   Implement LookupableTableSource for Hive connector.
   
   
   ## Brief change log
   
     - Make `HiveTableSource` implement LookupableTableSource
     - Chang `CommonLookupJoin` to call TypeInfoDataTypeConverter for the type conversion
     - Add test case
   
   
   ## Verifying this change
   
   Existing and added test cases.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? docs
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11990:
URL: https://github.com/apache/flink/pull/11990#issuecomment-623910490


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=605",
       "triggerID" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c9a2b1d53715e766991efb85ec98ba96c14f0f7b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=851",
       "triggerID" : "c9a2b1d53715e766991efb85ec98ba96c14f0f7b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3591a7e419816e0dc6a13110e180df331726fa44",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1113",
       "triggerID" : "3591a7e419816e0dc6a13110e180df331726fa44",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99f3bca89ea39ba4b354fee78dcc233394b6477e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "99f3bca89ea39ba4b354fee78dcc233394b6477e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3591a7e419816e0dc6a13110e180df331726fa44 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1113) 
   * 99f3bca89ea39ba4b354fee78dcc233394b6477e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] bowenli86 commented on a change in pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
bowenli86 commented on a change in pull request #11990:
URL: https://github.com/apache/flink/pull/11990#discussion_r422454846



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableLookupFunction.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connectors.hive.FlinkHiveException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Lookup table function for Hive tables.
+ */
+public class HiveTableLookupFunction extends TableFunction<RowData> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final HiveTableInputFormat inputFormat;
+	// indices of lookup columns in the record returned by input format
+	private final int[] lookupCols;
+	private transient Map<RowData, List<RowData>> cache;
+	// timestamp when cache expires
+	private transient long cacheExpire;
+	private final Duration cacheTTL = Duration.ofHours(1);
+
+	public HiveTableLookupFunction(HiveTableInputFormat inputFormat, String[] lookupKeys) {
+		lookupCols = new int[lookupKeys.length];
+		String[] allFields = inputFormat.getFieldNames();
+		Map<String, Integer> nameToIndex = IntStream.range(0, allFields.length).boxed().collect(
+				Collectors.toMap(i -> allFields[i], i -> i));
+		List<Integer> selectedIndices = Arrays.stream(inputFormat.getSelectedFields()).boxed().collect(Collectors.toList());
+		for (int i = 0; i < lookupKeys.length; i++) {
+			Integer index = nameToIndex.get(lookupKeys[i]);
+			Preconditions.checkArgument(index != null, "Lookup keys %s not found in table schema", Arrays.toString(lookupKeys));
+			index = selectedIndices.indexOf(index);
+			Preconditions.checkArgument(index >= 0, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+			lookupCols[i] = index;
+		}
+		this.inputFormat = inputFormat;
+	}
+
+	@Override
+	public TypeInformation<RowData> getResultType() {
+		String[] allNames = inputFormat.getFieldNames();
+		DataType[] allTypes = inputFormat.getFieldTypes();
+		int[] selected = inputFormat.getSelectedFields();
+		return new RowDataTypeInfo(
+				Arrays.stream(selected).mapToObj(i -> allTypes[i].getLogicalType()).toArray(LogicalType[]::new),
+				Arrays.stream(selected).mapToObj(i -> allNames[i]).toArray(String[]::new));
+	}
+
+	@Override
+	public void open(FunctionContext context) throws Exception {
+		super.open(context);
+		cache = new HashMap<>();
+		cacheExpire = -1;
+	}
+
+	public void eval(Object... values) {
+		Preconditions.checkArgument(values.length == lookupCols.length, "Number of values and lookup keys mismatch");
+		reloadCache();
+		RowData probeKey = GenericRowData.of(values);
+		List<RowData> matchedRows = cache.get(probeKey);
+		if (matchedRows != null) {
+			for (RowData matchedRow : matchedRows) {
+				collect(matchedRow);
+			}
+		}
+	}
+
+	private void reloadCache() {

Review comment:
       I'm really concerned about caching the whole table, especially every parallelism does so.
   
   We'd better add explicit documentation to warn users potential consequences and best practices.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lirui-apache commented on pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
lirui-apache commented on pull request #11990:
URL: https://github.com/apache/flink/pull/11990#issuecomment-628633094


   Build has passed on my personal repo: https://dev.azure.com/lirui-apache/flink/_build/results?buildId=40&view=results


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11990:
URL: https://github.com/apache/flink/pull/11990#issuecomment-623910490


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=605",
       "triggerID" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9a940870922bfb7e92ea199e3affd893cbadd745 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=605) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lirui-apache commented on a change in pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
lirui-apache commented on a change in pull request #11990:
URL: https://github.com/apache/flink/pull/11990#discussion_r422442698



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableLookupFunction.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connectors.hive.FlinkHiveException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Lookup table function for Hive tables.
+ */
+public class HiveTableLookupFunction extends TableFunction<RowData> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final HiveTableInputFormat inputFormat;
+	// indices of lookup columns in the record returned by input format
+	private final int[] lookupCols;
+	private transient Map<RowData, List<RowData>> cache;
+	// timestamp when cache expires
+	private transient long cacheExpire;
+	private final Duration cacheTTL = Duration.ofHours(1);
+
+	public HiveTableLookupFunction(HiveTableInputFormat inputFormat, String[] lookupKeys) {
+		lookupCols = new int[lookupKeys.length];
+		String[] allFields = inputFormat.getFieldNames();
+		Map<String, Integer> nameToIndex = IntStream.range(0, allFields.length).boxed().collect(
+				Collectors.toMap(i -> allFields[i], i -> i));
+		List<Integer> selectedIndices = Arrays.stream(inputFormat.getSelectedFields()).boxed().collect(Collectors.toList());
+		for (int i = 0; i < lookupKeys.length; i++) {
+			Integer index = nameToIndex.get(lookupKeys[i]);
+			Preconditions.checkArgument(index != null, "Lookup keys %s not found in table schema", Arrays.toString(lookupKeys));
+			index = selectedIndices.indexOf(index);
+			Preconditions.checkArgument(index >= 0, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+			lookupCols[i] = index;
+		}
+		this.inputFormat = inputFormat;
+	}
+
+	@Override
+	public TypeInformation<RowData> getResultType() {
+		String[] allNames = inputFormat.getFieldNames();
+		DataType[] allTypes = inputFormat.getFieldTypes();
+		int[] selected = inputFormat.getSelectedFields();
+		return new RowDataTypeInfo(
+				Arrays.stream(selected).mapToObj(i -> allTypes[i].getLogicalType()).toArray(LogicalType[]::new),
+				Arrays.stream(selected).mapToObj(i -> allNames[i]).toArray(String[]::new));
+	}
+
+	@Override
+	public void open(FunctionContext context) throws Exception {
+		super.open(context);
+		cache = new HashMap<>();
+		cacheExpire = -1;
+	}
+
+	public void eval(Object... values) {
+		Preconditions.checkArgument(values.length == lookupCols.length, "Number of values and lookup keys mismatch");
+		reloadCache();
+		RowData probeKey = GenericRowData.of(values);
+		List<RowData> matchedRows = cache.get(probeKey);
+		if (matchedRows != null) {
+			for (RowData matchedRow : matchedRows) {
+				collect(matchedRow);
+			}
+		}
+	}
+
+	private void reloadCache() {

Review comment:
       Not all rows of a hive file, but all rows of a hive table. If we only cache part of the data, we have to do full table scan whenever we have a cache miss. So I guess we might as well cache all the data.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] bowenli86 commented on a change in pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
bowenli86 commented on a change in pull request #11990:
URL: https://github.com/apache/flink/pull/11990#discussion_r422583979



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableLookupFunction.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connectors.hive.FlinkHiveException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Lookup table function for Hive tables.
+ */
+public class HiveTableLookupFunction extends TableFunction<RowData> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final HiveTableInputFormat inputFormat;
+	// indices of lookup columns in the record returned by input format
+	private final int[] lookupCols;
+	private transient Map<RowData, List<RowData>> cache;
+	// timestamp when cache expires
+	private transient long cacheExpire;
+	private final Duration cacheTTL = Duration.ofHours(1);
+
+	public HiveTableLookupFunction(HiveTableInputFormat inputFormat, String[] lookupKeys) {
+		lookupCols = new int[lookupKeys.length];
+		String[] allFields = inputFormat.getFieldNames();
+		Map<String, Integer> nameToIndex = IntStream.range(0, allFields.length).boxed().collect(
+				Collectors.toMap(i -> allFields[i], i -> i));
+		List<Integer> selectedIndices = Arrays.stream(inputFormat.getSelectedFields()).boxed().collect(Collectors.toList());
+		for (int i = 0; i < lookupKeys.length; i++) {
+			Integer index = nameToIndex.get(lookupKeys[i]);
+			Preconditions.checkArgument(index != null, "Lookup keys %s not found in table schema", Arrays.toString(lookupKeys));
+			index = selectedIndices.indexOf(index);
+			Preconditions.checkArgument(index >= 0, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+			lookupCols[i] = index;
+		}
+		this.inputFormat = inputFormat;
+	}
+
+	@Override
+	public TypeInformation<RowData> getResultType() {
+		String[] allNames = inputFormat.getFieldNames();
+		DataType[] allTypes = inputFormat.getFieldTypes();
+		int[] selected = inputFormat.getSelectedFields();
+		return new RowDataTypeInfo(
+				Arrays.stream(selected).mapToObj(i -> allTypes[i].getLogicalType()).toArray(LogicalType[]::new),
+				Arrays.stream(selected).mapToObj(i -> allNames[i]).toArray(String[]::new));
+	}
+
+	@Override
+	public void open(FunctionContext context) throws Exception {
+		super.open(context);
+		cache = new HashMap<>();
+		cacheExpire = -1;
+	}
+
+	public void eval(Object... values) {
+		Preconditions.checkArgument(values.length == lookupCols.length, "Number of values and lookup keys mismatch");
+		reloadCache();
+		RowData probeKey = GenericRowData.of(values);
+		List<RowData> matchedRows = cache.get(probeKey);
+		if (matchedRows != null) {
+			for (RowData matchedRow : matchedRows) {
+				collect(matchedRow);
+			}
+		}
+	}
+
+	private void reloadCache() {

Review comment:
       @lirui-apache thanks for the explanation!
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11990:
URL: https://github.com/apache/flink/pull/11990#issuecomment-623910490


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=605",
       "triggerID" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c9a2b1d53715e766991efb85ec98ba96c14f0f7b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c9a2b1d53715e766991efb85ec98ba96c14f0f7b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9a940870922bfb7e92ea199e3affd893cbadd745 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=605) 
   * c9a2b1d53715e766991efb85ec98ba96c14f0f7b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11990:
URL: https://github.com/apache/flink/pull/11990#issuecomment-623910490


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=605",
       "triggerID" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c9a2b1d53715e766991efb85ec98ba96c14f0f7b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=851",
       "triggerID" : "c9a2b1d53715e766991efb85ec98ba96c14f0f7b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3591a7e419816e0dc6a13110e180df331726fa44",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1113",
       "triggerID" : "3591a7e419816e0dc6a13110e180df331726fa44",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99f3bca89ea39ba4b354fee78dcc233394b6477e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1279",
       "triggerID" : "99f3bca89ea39ba4b354fee78dcc233394b6477e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99f3bca89ea39ba4b354fee78dcc233394b6477e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1279) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] bowenli86 commented on a change in pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
bowenli86 commented on a change in pull request #11990:
URL: https://github.com/apache/flink/pull/11990#discussion_r422294096



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableLookupFunction.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connectors.hive.FlinkHiveException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Lookup table function for Hive tables.
+ */
+public class HiveTableLookupFunction extends TableFunction<RowData> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final HiveTableInputFormat inputFormat;
+	// indices of lookup columns in the record returned by input format
+	private final int[] lookupCols;
+	private transient Map<RowData, List<RowData>> cache;
+	// timestamp when cache expires
+	private transient long cacheExpire;
+	private final Duration cacheTTL = Duration.ofHours(1);
+
+	public HiveTableLookupFunction(HiveTableInputFormat inputFormat, String[] lookupKeys) {
+		lookupCols = new int[lookupKeys.length];
+		String[] allFields = inputFormat.getFieldNames();
+		Map<String, Integer> nameToIndex = IntStream.range(0, allFields.length).boxed().collect(
+				Collectors.toMap(i -> allFields[i], i -> i));
+		List<Integer> selectedIndices = Arrays.stream(inputFormat.getSelectedFields()).boxed().collect(Collectors.toList());
+		for (int i = 0; i < lookupKeys.length; i++) {
+			Integer index = nameToIndex.get(lookupKeys[i]);
+			Preconditions.checkArgument(index != null, "Lookup keys %s not found in table schema", Arrays.toString(lookupKeys));
+			index = selectedIndices.indexOf(index);
+			Preconditions.checkArgument(index >= 0, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+			lookupCols[i] = index;
+		}
+		this.inputFormat = inputFormat;
+	}
+
+	@Override
+	public TypeInformation<RowData> getResultType() {
+		String[] allNames = inputFormat.getFieldNames();
+		DataType[] allTypes = inputFormat.getFieldTypes();
+		int[] selected = inputFormat.getSelectedFields();
+		return new RowDataTypeInfo(
+				Arrays.stream(selected).mapToObj(i -> allTypes[i].getLogicalType()).toArray(LogicalType[]::new),
+				Arrays.stream(selected).mapToObj(i -> allNames[i]).toArray(String[]::new));
+	}
+
+	@Override
+	public void open(FunctionContext context) throws Exception {
+		super.open(context);
+		cache = new HashMap<>();
+		cacheExpire = -1;
+	}
+
+	public void eval(Object... values) {
+		Preconditions.checkArgument(values.length == lookupCols.length, "Number of values and lookup keys mismatch");
+		reloadCache();
+		RowData probeKey = GenericRowData.of(values);
+		List<RowData> matchedRows = cache.get(probeKey);
+		if (matchedRows != null) {
+			for (RowData matchedRow : matchedRows) {
+				collect(matchedRow);
+			}
+		}
+	}
+
+	private void reloadCache() {

Review comment:
       does it mean it caches all rows of a hive file?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #11990:
URL: https://github.com/apache/flink/pull/11990#issuecomment-623902687


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 9a940870922bfb7e92ea199e3affd893cbadd745 (Tue May 05 07:35:02 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11990:
URL: https://github.com/apache/flink/pull/11990#issuecomment-623910490


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=605",
       "triggerID" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c9a2b1d53715e766991efb85ec98ba96c14f0f7b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=851",
       "triggerID" : "c9a2b1d53715e766991efb85ec98ba96c14f0f7b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c9a2b1d53715e766991efb85ec98ba96c14f0f7b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=851) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #11990:
URL: https://github.com/apache/flink/pull/11990#discussion_r424982468



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
##########
@@ -366,6 +375,12 @@ public boolean isLimitPushedDown() {
 			List<String> partitionColNames = catalogTable.getPartitionKeys();
 			Table hiveTable = client.getTable(dbName, tableName);
 			Properties tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, hiveTable);
+			String ttlStr = tableProps.getProperty(FileSystemOptions.LOOKUP_JOIN_CACHE_TTL.key());
+			Configuration configuration = new Configuration();

Review comment:
       You can just use `TimeUtils.parseDuration`

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Lookup table function for filesystem connector tables.
+ */
+public class FileSystemLookupFunction<T extends InputSplit> extends TableFunction<RowData> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final InputFormat<RowData, T> inputFormat;
+	private final LookupContext context;
+	// indices of lookup columns in the record returned by input format
+	private final int[] lookupCols;
+	// use Row as key because we'll get external data in eval
+	private transient Map<Row, List<RowData>> cache;
+	// timestamp when cache expires
+	private transient long nextLoadTime;
+	// serializer to copy RowData
+	private transient TypeSerializer<RowData> serializer;
+	// converters to convert data from internal to external in order to generate keys for the cache
+	private final DataFormatConverter[] converters;
+
+	public FileSystemLookupFunction(InputFormat<RowData, T> inputFormat, String[] lookupKeys, LookupContext context) {
+		lookupCols = new int[lookupKeys.length];
+		converters = new DataFormatConverter[lookupKeys.length];
+		Map<String, Integer> nameToIndex = IntStream.range(0, context.selectedNames.length).boxed().collect(
+				Collectors.toMap(i -> context.selectedNames[i], i -> i));
+		for (int i = 0; i < lookupKeys.length; i++) {
+			Integer index = nameToIndex.get(lookupKeys[i]);
+			Preconditions.checkArgument(index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+			converters[i] = DataFormatConverters.getConverterForDataType(context.selectedTypes[index]);
+			lookupCols[i] = index;
+		}
+		this.inputFormat = inputFormat;
+		this.context = context;
+	}
+
+	@Override
+	public TypeInformation<RowData> getResultType() {
+		return new RowDataTypeInfo(
+				Arrays.stream(context.selectedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new),
+				context.selectedNames);
+	}
+
+	@Override
+	public void open(FunctionContext context) throws Exception {
+		super.open(context);
+		cache = new HashMap<>();
+		nextLoadTime = -1;
+		// TODO: get ExecutionConfig from context?
+		serializer = getResultType().createSerializer(new ExecutionConfig());
+	}
+
+	public void eval(Object... values) {
+		Preconditions.checkArgument(values.length == lookupCols.length, "Number of values and lookup keys mismatch");
+		checkCacheReload();
+		Row probeKey = Row.of(values);
+		List<RowData> matchedRows = cache.get(probeKey);
+		if (matchedRows != null) {
+			for (RowData matchedRow : matchedRows) {
+				collect(matchedRow);
+			}
+		}
+	}
+
+	@VisibleForTesting
+	public Duration getCacheTTL() {
+		return context.cacheTTL;
+	}
+
+	private void checkCacheReload() {
+		if (nextLoadTime > System.currentTimeMillis()) {
+			return;
+		}
+		cache.clear();
+		try {
+			T[] inputSplits = inputFormat.createInputSplits(1);
+			GenericRowData reuse = new GenericRowData(context.selectedNames.length);
+			for (T split : inputSplits) {
+				inputFormat.open(split);
+				while (!inputFormat.reachedEnd()) {
+					RowData row = inputFormat.nextRecord(reuse);
+					Row key = extractKey(row);
+					List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
+					rows.add(serializer.copy(row));
+				}
+				inputFormat.close();
+			}
+			nextLoadTime = System.currentTimeMillis() + getCacheTTL().toMillis();
+		} catch (IOException e) {
+			throw new FlinkRuntimeException("Failed to load table into cache", e);
+		}
+	}
+
+	private Row extractKey(RowData row) {
+		Row key = new Row(lookupCols.length);
+		for (int i = 0; i < lookupCols.length; i++) {
+			key.setField(i, converters[i].toExternal(row, lookupCols[i]));
+		}
+		return key;
+	}
+
+	/**
+	 * A class to store context information for the lookup function.
+	 */
+	public static class LookupContext implements Serializable {

Review comment:
       Don't need this class?

##########
File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinTest.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
+import org.apache.flink.table.filesystem.FileSystemLookupFunction;
+import org.apache.flink.table.filesystem.FileSystemOptions;
+import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test lookup join of hive tables.
+ */
+@RunWith(FlinkStandaloneHiveRunner.class)
+public class HiveLookupJoinTest {
+
+	@HiveSQL(files = {})
+	private static HiveShell hiveShell;
+
+	@Test
+	public void test() throws Exception {
+		EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
+		TableEnvironment tableEnv = TableEnvironment.create(settings);
+		HiveCatalog hiveCatalog = HiveTestUtils.createHiveCatalog(hiveShell.getHiveConf());
+		tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+		tableEnv.useCatalog(hiveCatalog.getName());

Review comment:
       Put codes to before.

##########
File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinTest.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
+import org.apache.flink.table.filesystem.FileSystemLookupFunction;
+import org.apache.flink.table.filesystem.FileSystemOptions;
+import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test lookup join of hive tables.
+ */
+@RunWith(FlinkStandaloneHiveRunner.class)
+public class HiveLookupJoinTest {
+
+	@HiveSQL(files = {})
+	private static HiveShell hiveShell;
+
+	@Test
+	public void test() throws Exception {
+		EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
+		TableEnvironment tableEnv = TableEnvironment.create(settings);
+		HiveCatalog hiveCatalog = HiveTestUtils.createHiveCatalog(hiveShell.getHiveConf());
+		tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+		tableEnv.useCatalog(hiveCatalog.getName());
+
+		hiveShell.execute(String.format("create table build (x int,y string,z int) tblproperties ('%s'='5min')",

Review comment:
       Using hive DDL in flink.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Lookup table function for filesystem connector tables.
+ */
+public class FileSystemLookupFunction<T extends InputSplit> extends TableFunction<RowData> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final InputFormat<RowData, T> inputFormat;
+	private final LookupContext context;
+	// indices of lookup columns in the record returned by input format
+	private final int[] lookupCols;
+	// use Row as key because we'll get external data in eval
+	private transient Map<Row, List<RowData>> cache;
+	// timestamp when cache expires
+	private transient long nextLoadTime;
+	// serializer to copy RowData
+	private transient TypeSerializer<RowData> serializer;
+	// converters to convert data from internal to external in order to generate keys for the cache
+	private final DataFormatConverter[] converters;
+
+	public FileSystemLookupFunction(InputFormat<RowData, T> inputFormat, String[] lookupKeys, LookupContext context) {
+		lookupCols = new int[lookupKeys.length];
+		converters = new DataFormatConverter[lookupKeys.length];
+		Map<String, Integer> nameToIndex = IntStream.range(0, context.selectedNames.length).boxed().collect(
+				Collectors.toMap(i -> context.selectedNames[i], i -> i));
+		for (int i = 0; i < lookupKeys.length; i++) {
+			Integer index = nameToIndex.get(lookupKeys[i]);
+			Preconditions.checkArgument(index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+			converters[i] = DataFormatConverters.getConverterForDataType(context.selectedTypes[index]);
+			lookupCols[i] = index;
+		}
+		this.inputFormat = inputFormat;
+		this.context = context;
+	}
+
+	@Override
+	public TypeInformation<RowData> getResultType() {
+		return new RowDataTypeInfo(
+				Arrays.stream(context.selectedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new),
+				context.selectedNames);
+	}
+
+	@Override
+	public void open(FunctionContext context) throws Exception {
+		super.open(context);
+		cache = new HashMap<>();
+		nextLoadTime = -1;
+		// TODO: get ExecutionConfig from context?
+		serializer = getResultType().createSerializer(new ExecutionConfig());
+	}
+
+	public void eval(Object... values) {
+		Preconditions.checkArgument(values.length == lookupCols.length, "Number of values and lookup keys mismatch");
+		checkCacheReload();
+		Row probeKey = Row.of(values);

Review comment:
       values are internal structures, should be converted to external.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi merged pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #11990:
URL: https://github.com/apache/flink/pull/11990


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11990:
URL: https://github.com/apache/flink/pull/11990#issuecomment-623910490


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=605",
       "triggerID" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c9a2b1d53715e766991efb85ec98ba96c14f0f7b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=851",
       "triggerID" : "c9a2b1d53715e766991efb85ec98ba96c14f0f7b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9a940870922bfb7e92ea199e3affd893cbadd745 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=605) 
   * c9a2b1d53715e766991efb85ec98ba96c14f0f7b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=851) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #11990:
URL: https://github.com/apache/flink/pull/11990#issuecomment-623910490


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9a940870922bfb7e92ea199e3affd893cbadd745 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lirui-apache commented on pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
lirui-apache commented on pull request #11990:
URL: https://github.com/apache/flink/pull/11990#issuecomment-625145411


   @JingsongLi Could you have a look? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lirui-apache commented on a change in pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
lirui-apache commented on a change in pull request #11990:
URL: https://github.com/apache/flink/pull/11990#discussion_r422464266



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableLookupFunction.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connectors.hive.FlinkHiveException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Lookup table function for Hive tables.
+ */
+public class HiveTableLookupFunction extends TableFunction<RowData> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final HiveTableInputFormat inputFormat;
+	// indices of lookup columns in the record returned by input format
+	private final int[] lookupCols;
+	private transient Map<RowData, List<RowData>> cache;
+	// timestamp when cache expires
+	private transient long cacheExpire;
+	private final Duration cacheTTL = Duration.ofHours(1);
+
+	public HiveTableLookupFunction(HiveTableInputFormat inputFormat, String[] lookupKeys) {
+		lookupCols = new int[lookupKeys.length];
+		String[] allFields = inputFormat.getFieldNames();
+		Map<String, Integer> nameToIndex = IntStream.range(0, allFields.length).boxed().collect(
+				Collectors.toMap(i -> allFields[i], i -> i));
+		List<Integer> selectedIndices = Arrays.stream(inputFormat.getSelectedFields()).boxed().collect(Collectors.toList());
+		for (int i = 0; i < lookupKeys.length; i++) {
+			Integer index = nameToIndex.get(lookupKeys[i]);
+			Preconditions.checkArgument(index != null, "Lookup keys %s not found in table schema", Arrays.toString(lookupKeys));
+			index = selectedIndices.indexOf(index);
+			Preconditions.checkArgument(index >= 0, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+			lookupCols[i] = index;
+		}
+		this.inputFormat = inputFormat;
+	}
+
+	@Override
+	public TypeInformation<RowData> getResultType() {
+		String[] allNames = inputFormat.getFieldNames();
+		DataType[] allTypes = inputFormat.getFieldTypes();
+		int[] selected = inputFormat.getSelectedFields();
+		return new RowDataTypeInfo(
+				Arrays.stream(selected).mapToObj(i -> allTypes[i].getLogicalType()).toArray(LogicalType[]::new),
+				Arrays.stream(selected).mapToObj(i -> allNames[i]).toArray(String[]::new));
+	}
+
+	@Override
+	public void open(FunctionContext context) throws Exception {
+		super.open(context);
+		cache = new HashMap<>();
+		cacheExpire = -1;
+	}
+
+	public void eval(Object... values) {
+		Preconditions.checkArgument(values.length == lookupCols.length, "Number of values and lookup keys mismatch");
+		reloadCache();
+		RowData probeKey = GenericRowData.of(values);
+		List<RowData> matchedRows = cache.get(probeKey);
+		if (matchedRows != null) {
+			for (RowData matchedRow : matchedRows) {
+				collect(matchedRow);
+			}
+		}
+	}
+
+	private void reloadCache() {

Review comment:
       And your concern is valid. We will add explicit doc for that.
   BTW, in our internal implementation, we can do a partitioned lookup join, so that each task only has to cache data that belongs to a specific shuffle partition which reduces memory consumption. But I don' think that optimization can make it for 1.11.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11990:
URL: https://github.com/apache/flink/pull/11990#issuecomment-623910490


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=605",
       "triggerID" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9a940870922bfb7e92ea199e3affd893cbadd745 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=605) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11990: [FLINK-17387][hive] Implement LookupableTableSource for Hive connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11990:
URL: https://github.com/apache/flink/pull/11990#issuecomment-623910490


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=605",
       "triggerID" : "9a940870922bfb7e92ea199e3affd893cbadd745",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c9a2b1d53715e766991efb85ec98ba96c14f0f7b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=851",
       "triggerID" : "c9a2b1d53715e766991efb85ec98ba96c14f0f7b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3591a7e419816e0dc6a13110e180df331726fa44",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1113",
       "triggerID" : "3591a7e419816e0dc6a13110e180df331726fa44",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99f3bca89ea39ba4b354fee78dcc233394b6477e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1279",
       "triggerID" : "99f3bca89ea39ba4b354fee78dcc233394b6477e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3591a7e419816e0dc6a13110e180df331726fa44 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1113) 
   * 99f3bca89ea39ba4b354fee78dcc233394b6477e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1279) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org