You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/18 12:04:29 UTC

[GitHub] [flink] anlen321 opened a new pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

anlen321 opened a new pull request #14684:
URL: https://github.com/apache/flink/pull/14684


   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   Support async lookup for HBase connector.
   
   
   ## Brief change log
   1.Add HBaseRowDataAsyncLookupFunction to support async lookup HBase.
   2.Add HBaseLookupOptions to manage lookup options.
   3.Modification HBase2DynamicTableFactory to append lookup options.
   4.Modification HBaseDynamicTableSource#getLookupRuntimeProvider to  Support AsyncTableFunctionProvider.
   5.Modification HBaseSerde#convertToRow to support return a new object each time and to allow the results to be stored directly 
     in the cache.
   
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   1.HBaseDynamicTableFactoryTest#testLookupOptions
   2.HBaseDynamicTableFactoryTest#testLookupAsync
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
leonardBang commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762817601


   I'll take a look this PR, @wuchong .
   
   @anlen321 Looks like you fixed the conflicts with `git merge` command, it's recommended to use `git rebase master` command to fix the conflicts, could you move your change to single commit and then we will help review. Currently your two commits are messed up with others', it's hard to read.
   ![image](https://user-images.githubusercontent.com/5163645/105036057-ea740b80-5a96-11eb-858f-23f488fa80ef.png)
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhisheng17 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
zhisheng17 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r560152221



##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,211 @@
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**

Review comment:
       you can remove the Author/Date/description message




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhisheng17 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
zhisheng17 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r560152830



##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,211 @@
+package org.apache.flink.connector.hbase2.source;

Review comment:
       you should add apache LICENSE 

##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseLookupOptions.java
##########
@@ -0,0 +1,105 @@
+package org.apache.flink.connector.hbase.options;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * @Author: TingWuHuang

Review comment:
       also 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
leonardBang edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762817601


   I'll take a look this PR, @wuchong .
   
   @anlen321 Looks like you fixed the conflicts with `git merge` command, it's recommended to use `git rebase master` command to fix the conflicts, could you move your change to single commit and then we will help review? Currently your two commits are messed up with others', it's hard to read.
   ![image](https://user-images.githubusercontent.com/5163645/105036057-ea740b80-5a96-11eb-858f-23f488fa80ef.png)
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong merged pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
wuchong merged pull request #14684:
URL: https://github.com/apache/flink/pull/14684


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r587466648



##########
File path: flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.connector.hbase.util.PlannerType;
+import org.apache.flink.connector.hbase2.util.HBaseTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
+public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {

Review comment:
       Yes, I will do it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13653",
       "triggerID" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "triggerType" : "PUSH"
     }, {
       "hash" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13882",
       "triggerID" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14352",
       "triggerID" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c34d423032cb900ffad161afdb7f0b14fe7a824a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14610",
       "triggerID" : "c34d423032cb900ffad161afdb7f0b14fe7a824a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ebebb69b4975c436a8980b5052a0127e5806b94",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3ebebb69b4975c436a8980b5052a0127e5806b94",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c34d423032cb900ffad161afdb7f0b14fe7a824a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14610) 
   * 3ebebb69b4975c436a8980b5052a0127e5806b94 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762280677


   > @anlen321 could you rebase your branch to the lastest master to resolve the conflicts?
   
   ok,I will resolve it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13653",
       "triggerID" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "triggerType" : "PUSH"
     }, {
       "hash" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13882",
       "triggerID" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 063d767c14cfdff548b7eb428cae3a6a9f9e236e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13882) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f26efadf325bb52d3bc8b2711fc83a169ad61ca5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409) 
   * c7aa55d73e52cd7e010fb5370bdc4f3cf9833133 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-798913741


   > I have rebased and squashed the branch to trigger the build again. Will merge it once build is passed.
   
   Thank you very much!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f26efadf325bb52d3bc8b2711fc83a169ad61ca5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-766092286


   Hi @leonardBang I have submitted the new commit, please help to review it.thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13653",
       "triggerID" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "triggerType" : "PUSH"
     }, {
       "hash" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13882",
       "triggerID" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 063d767c14cfdff548b7eb428cae3a6a9f9e236e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13882) 
   * 0375eb67f1ccb8460862baea733795874ba102f3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r591233115



##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        LOG.info("start open ...");
+        Configuration config = prepareRuntimeConfiguration();
+        CompletableFuture<AsyncConnection> asyncConnectionFuture = ConnectionFactory.createAsyncConnection(config);
+        try {
+            asyncConnection = asyncConnectionFuture.get();
+            table = asyncConnection.getTable(TableName.valueOf(hTableName), (ExecutorService) Executors.directExecutor());
+
+            this.cache = cacheMaxSize == -1 || cacheExpireMs == 0 ? null : CacheBuilder.newBuilder()

Review comment:
       It would be safer to use `cacheMaxSize <= 0 || cacheExpireMs <= 0`.

##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
##########
@@ -223,8 +223,22 @@ public Get createGet(Object rowKey) {
         return get;
     }
 
-    /** Converts HBase {@link Result} into {@link RowData}. */
-    public RowData convertToRow(Result result) {
+    /**
+     * Converts HBase {@link Result} into {@link RowData}.
+     * @param result result of a query
+     * @param needReuse Whether the RowData structure needs to be reused
+     * @return
+     */
+    public RowData convertToRow(Result result, Boolean needReuse) {

Review comment:
       Use primitive `boolean` instead of `Boolean` if it should never be null. 

##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
##########
@@ -223,8 +223,22 @@ public Get createGet(Object rowKey) {
         return get;
     }
 
-    /** Converts HBase {@link Result} into {@link RowData}. */
-    public RowData convertToRow(Result result) {
+    /**
+     * Converts HBase {@link Result} into {@link RowData}.
+     * @param result result of a query
+     * @param needReuse Whether the RowData structure needs to be reused
+     * @return
+     */
+    public RowData convertToRow(Result result, Boolean needReuse) {
+        if (!needReuse){
+            // The output rows needs to be initialized each time
+            // to prevent the possibility of putting the output object into the cache.
+            reusedRow = new GenericRowData(fieldLength);

Review comment:
       If not reuse, we shouldn't assign a new object to `resusedRow`, otherwise, the new object will be changed in the next time. 
   
   We can have a local variable `reusedRow` and `reusedFamilyRows`, and assign them according to the `needReuse` flag. 
   
   Besides, please add a test for this. 

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        LOG.info("start open ...");
+        Configuration config = prepareRuntimeConfiguration();
+        CompletableFuture<AsyncConnection> asyncConnectionFuture = ConnectionFactory.createAsyncConnection(config);
+        try {
+            asyncConnection = asyncConnectionFuture.get();
+            table = asyncConnection.getTable(TableName.valueOf(hTableName), (ExecutorService) Executors.directExecutor());
+
+            this.cache = cacheMaxSize == -1 || cacheExpireMs == 0 ? null : CacheBuilder.newBuilder()
+                .recordStats()
+                .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+                .maximumSize(cacheMaxSize)
+                .build();
+            if (cache != null && context != null) {
+                context.getMetricGroup().gauge("lookupCacheHitRate", (Gauge<Double>) () -> cache.stats().hitRate());
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Exception while creating connection to HBase.", e);
+            throw new RuntimeException("Cannot create connection to HBase.", e);
+        }
+        this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
+        LOG.info("end open.");
+    }
+
+    /**
+     * The invoke entry point of lookup function.
+     * @param feature The result or exception is returned.
+     * @param rowKey the lookup key. Currently only support single rowkey.
+     */
+    public void eval(CompletableFuture<Collection<RowData>> feature, Object rowKey) {

Review comment:
       `feature` ==> `future`

##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
##########
@@ -223,8 +223,22 @@ public Get createGet(Object rowKey) {
         return get;
     }
 
-    /** Converts HBase {@link Result} into {@link RowData}. */
-    public RowData convertToRow(Result result) {
+    /**
+     * Converts HBase {@link Result} into {@link RowData}.
+     * @param result result of a query
+     * @param needReuse Whether the RowData structure needs to be reused
+     * @return

Review comment:
       remove if the javadoc is empty. 

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        LOG.info("start open ...");
+        Configuration config = prepareRuntimeConfiguration();
+        CompletableFuture<AsyncConnection> asyncConnectionFuture = ConnectionFactory.createAsyncConnection(config);
+        try {
+            asyncConnection = asyncConnectionFuture.get();
+            table = asyncConnection.getTable(TableName.valueOf(hTableName), (ExecutorService) Executors.directExecutor());
+
+            this.cache = cacheMaxSize == -1 || cacheExpireMs == 0 ? null : CacheBuilder.newBuilder()
+                .recordStats()
+                .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+                .maximumSize(cacheMaxSize)
+                .build();
+            if (cache != null && context != null) {
+                context.getMetricGroup().gauge("lookupCacheHitRate", (Gauge<Double>) () -> cache.stats().hitRate());
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Exception while creating connection to HBase.", e);
+            throw new RuntimeException("Cannot create connection to HBase.", e);
+        }
+        this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
+        LOG.info("end open.");
+    }
+
+    /**
+     * The invoke entry point of lookup function.
+     * @param feature The result or exception is returned.
+     * @param rowKey the lookup key. Currently only support single rowkey.
+     */
+    public void eval(CompletableFuture<Collection<RowData>> feature, Object rowKey) {
+        int currentRetry = 0;
+        if (cache != null){
+            RowData cacheRowData = cache.getIfPresent(rowKey);
+            if (cacheRowData  != null){
+                if (cacheRowData.getArity() == 0){
+                    feature.complete(Collections.emptyList());
+                } else {
+                    feature.complete(Collections.singletonList(cacheRowData));
+                }
+                return;
+            }
+        }
+        // fetch result
+        fetchResult(feature, currentRetry, rowKey);
+    }
+
+    /**
+     * Execute async fetch result .
+     * @param resultFuture The result or exception is returned.
+     * @param currentRetry Current number of retries.
+     * @param rowKey the lookup key.
+     */
+    private void fetchResult(CompletableFuture<Collection<RowData>> resultFuture, int currentRetry, Object rowKey){
+        Get get = serde.createGet(rowKey);
+        CompletableFuture<Result> responseFuture = table.get(get);
+        responseFuture.whenCompleteAsync(
+            (result, throwable) -> {
+                if (throwable != null) {
+                    if (throwable instanceof TableNotFoundException) {
+                        LOG.error("Table '{}' not found ", hTableName, throwable);
+                        resultFuture.completeExceptionally(
+                            new RuntimeException("HBase table '" + hTableName + "' not found.", throwable));
+                    } else {
+                        LOG.error(String.format("HBase asyncLookup error, retry times = %d", currentRetry), throwable);
+                        if (currentRetry >= maxRetryTimes) {
+                            resultFuture.completeExceptionally(throwable);
+                        } else {
+                            try {
+                                Thread.sleep(1000 * currentRetry);
+                            } catch (InterruptedException e1) {
+                                resultFuture.completeExceptionally(e1);
+                            }
+                            fetchResult(resultFuture, currentRetry + 1, rowKey);
+                        }
+                    }
+                } else {
+                    if (result.isEmpty()) {
+                        resultFuture.complete(Collections.emptyList());
+                        if (cache != null) {
+                            cache.put(rowKey, new GenericRowData(0));
+                        }
+                    } else {
+                        if (cache != null){
+                            RowData rowData = serde.convertToRow(result, false);
+                            resultFuture.complete(Collections.singletonList(rowData));
+                            cache.put(rowKey, rowData);
+                        } else {
+                            resultFuture.complete(Collections.singletonList(serde.convertToRow(result, true)));
+                        }
+                    }
+                }
+            });
+    }
+
+    private Configuration prepareRuntimeConfiguration() {
+        // create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
+        // and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
+        // user params from client-side have the highest priority
+        Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(
+                serializedConfig,
+                HBaseConfigurationUtil.getHBaseConfiguration());
+
+        // do validation: check key option(s) in final runtime configuration
+        if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
+            LOG.error("can not connect to HBase without {} configuration", HConstants.ZOOKEEPER_QUORUM);
+            throw new IllegalArgumentException("check HBase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
+        }
+
+        return runtimeConfig;
+    }
+
+    @Override
+    public void close() {
+        LOG.info("start close ...");
+        if (null != asyncConnection) {

Review comment:
       Why not close `table`?

##########
File path: flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
##########
@@ -122,6 +127,10 @@ public String factoryIdentifier() {
         set.add(SINK_BUFFER_FLUSH_MAX_ROWS);
         set.add(SINK_BUFFER_FLUSH_INTERVAL);
         set.add(SINK_PARALLELISM);
+        set.add(LOOKUP_ASYNC);

Review comment:
       We don't need to add this option if we don't support it. 

##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
##########
@@ -223,8 +223,22 @@ public Get createGet(Object rowKey) {
         return get;
     }
 
-    /** Converts HBase {@link Result} into {@link RowData}. */
-    public RowData convertToRow(Result result) {
+    /**
+     * Converts HBase {@link Result} into {@link RowData}.
+     * @param result result of a query
+     * @param needReuse Whether the RowData structure needs to be reused
+     * @return
+     */
+    public RowData convertToRow(Result result, Boolean needReuse) {
+        if (!needReuse){
+            // The output rows needs to be initialized each time
+            // to prevent the possibility of putting the output object into the cache.
+            reusedRow = new GenericRowData(fieldLength);

Review comment:
       This does have a bug because this method is not thread-safe. When it is called by multiple thread, all the thread will get the same result object. I think we can have two method and share the common logic. 
   
   ```java
   /**
        * Converts HBase {@link Result} into a new {@link RowData} instance.
        *
        * <p>Note: this method is thread-safe.
        */
       public RowData convertToNewRow(Result result) {
           // The output rows needs to be initialized each time
           // to prevent the possibility of putting the output object into the cache.
           GenericRowData resultRow = new GenericRowData(fieldLength);
           GenericRowData[] familyRows = new GenericRowData[families.length];
           for (int f = 0; f < families.length; f++) {
               familyRows[f] = new GenericRowData(qualifiers[f].length);
           }
           return convertToRow(result, resultRow, familyRows);
       }
   
       /**
        * Converts HBase {@link Result} into a reused {@link RowData} instance.
        *
        * <p>Note: this method is NOT thread-safe.
        */
       public RowData convertToReusedRow(Result result) {
           return convertToRow(result, reusedRow, reusedFamilyRows);
       }
   
       private RowData convertToRow(
               Result result, GenericRowData resultRow, GenericRowData[] familyRows) {
           for (int i = 0; i < fieldLength; i++) {
               if (rowkeyIndex == i) {
                   assert keyDecoder != null;
                   Object rowkey = keyDecoder.decode(result.getRow());
                   resultRow.setField(rowkeyIndex, rowkey);
               } else {
                   int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i;
                   // get family key
                   byte[] familyKey = families[f];
                   GenericRowData familyRow = familyRows[f];
                   for (int q = 0; q < this.qualifiers[f].length; q++) {
                       // get quantifier key
                       byte[] qualifier = qualifiers[f][q];
                       // read value
                       byte[] value = result.getValue(familyKey, qualifier);
                       familyRow.setField(q, qualifierDecoders[f][q].decode(value));
                   }
                   resultRow.setField(i, familyRow);
               }
           }
           return resultRow;
       }
   ```
   
   

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link

Review comment:
       The javadoc is incorrect. Table API doesn't support `AsyncTableFunction`, thus, this implementation can only be an internal class and can't be used by users directly. 
   
   You can simply describe the class is an implemenation to lookup HBase data by rowkey in async fashion. 

##########
File path: docs/content/docs/connectors/table/hbase.md
##########
@@ -166,6 +166,34 @@ Connector Options
       <td>Integer</td>
       <td>Defines the parallelism of the HBase sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td>
     </tr>
+    <tr>
+      <td><h5>lookup.async</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Whether async lookup are supported. If true, the lookup will be async. Note, async only supports hbase-2.2 connector.</td>

Review comment:
       ```suggestion
         <td>Whether async lookup are enabled. If true, the lookup will be async. Note, async only supports hbase-2.2 connector.</td>
   ```

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
##########
@@ -35,17 +42,45 @@ public HBaseDynamicTableSource(
             Configuration conf,
             String tableName,
             HBaseTableSchema hbaseSchema,
-            String nullStringLiteral) {
-        super(conf, tableName, hbaseSchema, nullStringLiteral);
+            String nullStringLiteral,
+            HBaseLookupOptions lookupOptions) {
+        super(conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
+    }
+
+    @Override
+    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+        checkArgument(context.getKeys().length == 1 && context.getKeys()[0].length == 1,
+            "Currently, HBase table can only be lookup by single rowkey.");
+        checkArgument(
+            hbaseSchema.getRowKeyName().isPresent(),
+            "HBase schema must have a row key when used in lookup mode.");
+        checkArgument(
+            hbaseSchema
+                .convertsToTableSchema()
+                .getTableColumn(context.getKeys()[0][0])
+                .filter(f -> f.getName().equals(hbaseSchema.getRowKeyName().get()))
+                .isPresent(),
+            "Currently, HBase table only supports lookup by rowkey field.");
+        if (lookupOptions.getLookupAsync()){
+            return AsyncTableFunctionProvider.of(new HBaseRowDataAsyncLookupFunction(conf, tableName, hbaseSchema,
+                nullStringLiteral, lookupOptions));
+        }
+        return TableFunctionProvider.of(new HBaseRowDataLookupFunction(conf, tableName, hbaseSchema, nullStringLiteral,

Review comment:
       Nit: can put in `else` branch. 

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        LOG.info("start open ...");
+        Configuration config = prepareRuntimeConfiguration();
+        CompletableFuture<AsyncConnection> asyncConnectionFuture = ConnectionFactory.createAsyncConnection(config);
+        try {
+            asyncConnection = asyncConnectionFuture.get();
+            table = asyncConnection.getTable(TableName.valueOf(hTableName), (ExecutorService) Executors.directExecutor());

Review comment:
       IIUC, the executor will be used to execute callbacks. Using `directExecutor` means all callback can't be executed async and may have performance problem. I think we shouldcreate a `Executors.newFixedThreadPool` for this. 
   
   See example: https://github.com/apache/hbase/blob/master/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
##########
@@ -581,6 +581,82 @@ public void testHBaseLookupTableSource() {
         assertEquals(expected, result);
     }
 
+    @Test
+    public void testHBaseAsyncLookupTableSource() {

Review comment:
       This test is a duplicate of `testHBaseLookupTableSource`. You can share the common logic of them, e.g. extract a common testing method:
   
   ```
   private void verifyHBaseLookupJoin(boolean async) {
     ...
   }
   
   ``

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.connector.hbase.util.PlannerType;
+import org.apache.flink.connector.hbase2.util.HBaseTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
+public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {
+
+    @Override
+    protected PlannerType planner() {
+        // lookup table source is only supported in blink planner
+        return PlannerType.BLINK_PLANNER;
+    }
+
+    @Test
+    public void testEval() throws Exception {
+        HBaseRowDataAsyncLookupFunction lookupFunction = buildRowDataAsyncLookupFunction();
+
+        lookupFunction.open(null);
+        List<RowData> list = new ArrayList<>();
+        int[] rowKey = {1, 2, 3};
+        for (int i = 0; i < rowKey.length; i++){
+            CompletableFuture<Collection<RowData>> feature = new CompletableFuture<>();
+            lookupFunction.eval(feature, rowKey[i]);
+            list.add(feature.get().iterator().next());
+        }
+        lookupFunction.close();
+        List<String> result =
+            Lists.newArrayList(list).stream()
+                .map(RowData::toString)
+                .sorted()
+                .collect(Collectors.toList());

Review comment:
       The above test also tests non-exist keys, and please also test it using a no-cache async lookup function.

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.connector.hbase.util.PlannerType;
+import org.apache.flink.connector.hbase2.util.HBaseTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
+public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {
+
+    @Override
+    protected PlannerType planner() {
+        // lookup table source is only supported in blink planner
+        return PlannerType.BLINK_PLANNER;
+    }
+
+    @Test
+    public void testEval() throws Exception {
+        HBaseRowDataAsyncLookupFunction lookupFunction = buildRowDataAsyncLookupFunction();
+
+        lookupFunction.open(null);
+        List<RowData> list = new ArrayList<>();
+        int[] rowKey = {1, 2, 3};
+        for (int i = 0; i < rowKey.length; i++){
+            CompletableFuture<Collection<RowData>> feature = new CompletableFuture<>();
+            lookupFunction.eval(feature, rowKey[i]);
+            list.add(feature.get().iterator().next());
+        }
+        lookupFunction.close();
+        List<String> result =
+            Lists.newArrayList(list).stream()
+                .map(RowData::toString)
+                .sorted()
+                .collect(Collectors.toList());

Review comment:
       This makes the async lookup to be sync and doesn't test the feature. I suggest to change it to async fashion, and it reveals a bug in HBaseSerDe. You can use the following test:
   
   
   ```java
   final List<String> result = new ArrayList<>();
           int[] rowkeys = {1, 2, 1, 12, 3, 12, 4, 3};
           CountDownLatch latch = new CountDownLatch(rowkeys.length);
           for (int rowkey : rowkeys) {
               CompletableFuture<Collection<RowData>> future = new CompletableFuture<>();
               lookupFunction.eval(future, rowkey);
               future.whenComplete(
                       (rs, t) -> {
                           System.out.println("complete: " + rs);
                           synchronized (result) {
                               if (rs.isEmpty()) {
                                   result.add(rowkey + ": null");
                               } else {
                                   rs.forEach(row -> result.add(rowkey + ": " + row.toString()));
                               }
                           }
                           latch.countDown();
                       });
           }
   
           // this verifies lookup calls are async
           assertTrue(result.size() < rowkeys.length);
           latch.await();
           
           assertEquals(...);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
leonardBang commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r570712863



##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
##########
@@ -35,17 +42,45 @@ public HBaseDynamicTableSource(
             Configuration conf,
             String tableName,
             HBaseTableSchema hbaseSchema,
-            String nullStringLiteral) {
-        super(conf, tableName, hbaseSchema, nullStringLiteral);
+            String nullStringLiteral,
+            HBaseLookupOptions lookupOptions) {
+        super(conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
+    }
+
+    @Override
+    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+        checkArgument(context.getKeys().length == 1 && context.getKeys()[0].length == 1,
+            "Currently, HBase table can only be lookup by single rowkey.");
+        checkArgument(
+            hbaseSchema.getRowKeyName().isPresent(),
+            "HBase schema must have a row key when used in lookup mode.");
+        checkArgument(
+            hbaseSchema
+                .convertsToTableSchema()
+                .getTableColumn(context.getKeys()[0][0])
+                .filter(f -> f.getName().equals(hbaseSchema.getRowKeyName().get()))
+                .isPresent(),
+            "Currently, HBase table only supports lookup by rowkey field.");
+        boolean isAsync = lookupOptions.getLookupAsync();
+        if (isAsync){

Review comment:
       ```suggestion
           if (lookupOptions.getLookupAsync()){
   ```

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+

Review comment:
       redundant blank lines

##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseOptions.java
##########
@@ -94,6 +94,33 @@
                                     + "Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
                                     + "can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.");
 
+    public static final ConfigOption<Boolean> LOOKUP_ASYNC =
+        ConfigOptions.key("lookup.async")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("whether to set async lookup.");
+
+    public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
+        ConfigOptions.key("lookup.cache.max-rows")
+            .longType()
+            .defaultValue(-1L)
+            .withDescription(
+                "the max number of rows of lookup cache, over this value, the oldest rows will "
+                    + "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is "
+                    + "specified. Cache is not enabled as default.");
+
+    public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
+        ConfigOptions.key("lookup.cache.ttl")
+            .durationType()
+            .defaultValue(Duration.ofSeconds(0))
+            .withDescription("the cache time to live.");
+
+    public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
+        ConfigOptions.key("lookup.max-retries")
+            .intType()
+            .defaultValue(3)
+            .withDescription("the max retry times if lookup database failed.");

Review comment:
       The three Options should not be related to `LOOKUP_ASYNC`, if we plan to support this, we also need to support cache for `HBaseRowDataLookupFunction`, otherwise this option is invalid if user did not enable `LOOKUP_ASYNC` 

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    /**
+     * The invoke entry point of lookup function.
+     * @param feature The result or exception is returned.
+     * @param rowKey the lookup key. Currently only support single rowkey.
+     */
+    public void eval(CompletableFuture<Collection<RowData>> feature, Object rowKey) {
+        int currentRetry = 0;
+        if (cache != null){
+            RowData cacheRowData = cache.getIfPresent(rowKey);
+            if (cacheRowData  != null){
+                if (cacheRowData.getArity() == 0){
+                    feature.complete(Collections.emptyList());
+                } else {
+                    feature.complete(Collections.singletonList(cacheRowData));
+                }
+                return;
+            }
+        }
+        // fetch result
+        fetchResult(feature, currentRetry, rowKey);
+    }
+
+    /**
+     * Execute async fetch result .
+     * @param feature The result or exception is returned.
+     * @param currentRetry Current number of retries.
+     * @param rowKey the lookup key.
+     */
+    private void fetchResult(CompletableFuture<Collection<RowData>> feature, int currentRetry, Object rowKey){
+        Get get = serde.createGet(rowKey);
+        CompletableFuture<Result> resultFuture = table.get(get);
+        resultFuture.whenCompleteAsync(
+            (result, throwable) -> {
+                if (throwable != null) {
+                    if (throwable instanceof TableNotFoundException) {
+                        LOG.error("Table '{}' not found ", hTableName, throwable);
+                        feature.completeExceptionally(
+                            new RuntimeException("HBase table '" + hTableName + "' not found.", throwable));
+                    } else {
+                        LOG.error(String.format("Hbase asyncLookup error, retry times = %d", currentRetry), throwable);
+                        if (currentRetry >= maxRetryTimes) {
+                            feature.completeExceptionally(throwable);
+                        } else {
+                            try {
+                                Thread.sleep(1000 * currentRetry);
+                            } catch (InterruptedException e1) {
+                                feature.completeExceptionally(e1);
+                            }
+                            fetchResult(feature, currentRetry + 1, rowKey);
+                        }
+                    }
+                } else {
+                    boolean flag = result.isEmpty();
+                    if (flag) {
+                        feature.complete(Collections.emptyList());
+                        if (cache != null) {
+                            cache.put(rowKey, new GenericRowData(0));
+                        }
+                    } else {
+                        GenericRowData rowData = (GenericRowData) serde.convertToRow(result);
+                        feature.complete(Collections.singletonList(rowData));
+                        if (cache != null){
+                            cache.put(rowKey, rowData);
+                        }
+                    }
+                }
+            });
+    }
+
+    private Configuration prepareRuntimeConfiguration() {
+        // create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
+        // and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
+        // user params from client-side have the highest priority
+        Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(
+                serializedConfig,
+                HBaseConfigurationUtil.getHBaseConfiguration());
+
+        // do validation: check key option(s) in final runtime configuration
+        if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
+            LOG.error("can not connect to HBase without {} configuration", HConstants.ZOOKEEPER_QUORUM);
+            throw new IllegalArgumentException("check HBase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
+        }
+
+        return runtimeConfig;
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        LOG.info("start open ...");
+        Configuration config = prepareRuntimeConfiguration();
+        CompletableFuture<AsyncConnection> asyncConnectionFuture = ConnectionFactory.createAsyncConnection(config);
+        try {
+            asyncConnection = asyncConnectionFuture.get();
+            table = asyncConnection.getTable(TableName.valueOf(hTableName), (ExecutorService) Executors.directExecutor());
+
+            this.cache = cacheMaxSize == -1 || cacheExpireMs == 0 ? null : CacheBuilder.newBuilder()
+                    .recordStats()
+                    .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+                    .maximumSize(cacheMaxSize)
+                    .build();
+            if (cache != null) {
+                context.getMetricGroup().gauge("lookupCacheHitRate", (Gauge<Double>) () -> cache.stats().hitRate());
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Exception while creating connection to HBase.", e);
+            throw new RuntimeException("Cannot create connection to HBase.", e);
+        }
+        this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
+        LOG.info("end open.");
+    }
+
+    @Override
+    public void close() {
+        LOG.info("start close ...");
+        if (null != asyncConnection) {
+            try {
+                asyncConnection.close();
+                asyncConnection = null;
+            } catch (IOException e) {
+                // ignore exception when close.
+                LOG.warn("exception when close connection", e);
+            }
+        }
+        LOG.info("end close.");
+    }
+
+    @VisibleForTesting
+    public String getHTableName() {
+        return hTableName;
+    }
+
+
+}

Review comment:
       I think we should add unit test for this class just like `JdbcRowDataLookupFunctionTest` and a ITCase in HBaseConnectorITCase as well

##########
File path: docs/dev/table/connectors/hbase.md
##########
@@ -172,6 +172,34 @@ Connector Options
       <td>Integer</td>
       <td>Defines the parallelism of the HBase sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td>
     </tr>
+    <tr>
+      <td><h5>lookup.async</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Whether async lookup are supported.If true,the lookup will be async.Note,async only supports hbase-2.2 connector.</td>

Review comment:
       Please note the doc format like comma/point should follow a space, you can reference the JDBC doc page 

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    /**
+     * The invoke entry point of lookup function.
+     * @param feature The result or exception is returned.
+     * @param rowKey the lookup key. Currently only support single rowkey.
+     */
+    public void eval(CompletableFuture<Collection<RowData>> feature, Object rowKey) {
+        int currentRetry = 0;
+        if (cache != null){
+            RowData cacheRowData = cache.getIfPresent(rowKey);
+            if (cacheRowData  != null){
+                if (cacheRowData.getArity() == 0){
+                    feature.complete(Collections.emptyList());
+                } else {
+                    feature.complete(Collections.singletonList(cacheRowData));
+                }
+                return;
+            }
+        }
+        // fetch result
+        fetchResult(feature, currentRetry, rowKey);
+    }
+
+    /**
+     * Execute async fetch result .
+     * @param feature The result or exception is returned.
+     * @param currentRetry Current number of retries.
+     * @param rowKey the lookup key.
+     */
+    private void fetchResult(CompletableFuture<Collection<RowData>> feature, int currentRetry, Object rowKey){
+        Get get = serde.createGet(rowKey);
+        CompletableFuture<Result> resultFuture = table.get(get);
+        resultFuture.whenCompleteAsync(
+            (result, throwable) -> {
+                if (throwable != null) {
+                    if (throwable instanceof TableNotFoundException) {
+                        LOG.error("Table '{}' not found ", hTableName, throwable);
+                        feature.completeExceptionally(
+                            new RuntimeException("HBase table '" + hTableName + "' not found.", throwable));
+                    } else {
+                        LOG.error(String.format("Hbase asyncLookup error, retry times = %d", currentRetry), throwable);
+                        if (currentRetry >= maxRetryTimes) {
+                            feature.completeExceptionally(throwable);
+                        } else {
+                            try {
+                                Thread.sleep(1000 * currentRetry);
+                            } catch (InterruptedException e1) {
+                                feature.completeExceptionally(e1);
+                            }
+                            fetchResult(feature, currentRetry + 1, rowKey);
+                        }
+                    }
+                } else {
+                    boolean flag = result.isEmpty();
+                    if (flag) {
+                        feature.complete(Collections.emptyList());
+                        if (cache != null) {
+                            cache.put(rowKey, new GenericRowData(0));
+                        }
+                    } else {
+                        GenericRowData rowData = (GenericRowData) serde.convertToRow(result);
+                        feature.complete(Collections.singletonList(rowData));
+                        if (cache != null){
+                            cache.put(rowKey, rowData);
+                        }
+                    }
+                }
+            });
+    }
+
+    private Configuration prepareRuntimeConfiguration() {
+        // create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
+        // and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
+        // user params from client-side have the highest priority
+        Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(
+                serializedConfig,
+                HBaseConfigurationUtil.getHBaseConfiguration());
+
+        // do validation: check key option(s) in final runtime configuration
+        if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
+            LOG.error("can not connect to HBase without {} configuration", HConstants.ZOOKEEPER_QUORUM);
+            throw new IllegalArgumentException("check HBase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
+        }
+
+        return runtimeConfig;
+    }
+
+    @Override
+    public void open(FunctionContext context) {

Review comment:
       we can keep the function order with parent to make the code more readable

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    /**
+     * The invoke entry point of lookup function.
+     * @param feature The result or exception is returned.
+     * @param rowKey the lookup key. Currently only support single rowkey.
+     */
+    public void eval(CompletableFuture<Collection<RowData>> feature, Object rowKey) {
+        int currentRetry = 0;
+        if (cache != null){
+            RowData cacheRowData = cache.getIfPresent(rowKey);
+            if (cacheRowData  != null){
+                if (cacheRowData.getArity() == 0){
+                    feature.complete(Collections.emptyList());
+                } else {
+                    feature.complete(Collections.singletonList(cacheRowData));
+                }
+                return;
+            }
+        }
+        // fetch result
+        fetchResult(feature, currentRetry, rowKey);
+    }
+
+    /**
+     * Execute async fetch result .
+     * @param feature The result or exception is returned.
+     * @param currentRetry Current number of retries.
+     * @param rowKey the lookup key.
+     */
+    private void fetchResult(CompletableFuture<Collection<RowData>> feature, int currentRetry, Object rowKey){

Review comment:
       feature -> resultFuture

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    /**
+     * The invoke entry point of lookup function.
+     * @param feature The result or exception is returned.
+     * @param rowKey the lookup key. Currently only support single rowkey.
+     */
+    public void eval(CompletableFuture<Collection<RowData>> feature, Object rowKey) {
+        int currentRetry = 0;
+        if (cache != null){
+            RowData cacheRowData = cache.getIfPresent(rowKey);
+            if (cacheRowData  != null){
+                if (cacheRowData.getArity() == 0){
+                    feature.complete(Collections.emptyList());
+                } else {
+                    feature.complete(Collections.singletonList(cacheRowData));
+                }
+                return;
+            }
+        }
+        // fetch result
+        fetchResult(feature, currentRetry, rowKey);
+    }
+
+    /**
+     * Execute async fetch result .
+     * @param feature The result or exception is returned.
+     * @param currentRetry Current number of retries.
+     * @param rowKey the lookup key.
+     */
+    private void fetchResult(CompletableFuture<Collection<RowData>> feature, int currentRetry, Object rowKey){
+        Get get = serde.createGet(rowKey);
+        CompletableFuture<Result> resultFuture = table.get(get);
+        resultFuture.whenCompleteAsync(
+            (result, throwable) -> {
+                if (throwable != null) {
+                    if (throwable instanceof TableNotFoundException) {
+                        LOG.error("Table '{}' not found ", hTableName, throwable);
+                        feature.completeExceptionally(
+                            new RuntimeException("HBase table '" + hTableName + "' not found.", throwable));
+                    } else {
+                        LOG.error(String.format("Hbase asyncLookup error, retry times = %d", currentRetry), throwable);
+                        if (currentRetry >= maxRetryTimes) {
+                            feature.completeExceptionally(throwable);
+                        } else {
+                            try {
+                                Thread.sleep(1000 * currentRetry);
+                            } catch (InterruptedException e1) {
+                                feature.completeExceptionally(e1);
+                            }
+                            fetchResult(feature, currentRetry + 1, rowKey);
+                        }
+                    }
+                } else {
+                    boolean flag = result.isEmpty();
+                    if (flag) {
+                        feature.complete(Collections.emptyList());
+                        if (cache != null) {
+                            cache.put(rowKey, new GenericRowData(0));
+                        }
+                    } else {
+                        GenericRowData rowData = (GenericRowData) serde.convertToRow(result);
+                        feature.complete(Collections.singletonList(rowData));
+                        if (cache != null){
+                            cache.put(rowKey, rowData);
+                        }
+                    }
+                }
+            });
+    }
+
+    private Configuration prepareRuntimeConfiguration() {
+        // create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
+        // and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
+        // user params from client-side have the highest priority
+        Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(
+                serializedConfig,
+                HBaseConfigurationUtil.getHBaseConfiguration());
+
+        // do validation: check key option(s) in final runtime configuration
+        if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
+            LOG.error("can not connect to HBase without {} configuration", HConstants.ZOOKEEPER_QUORUM);
+            throw new IllegalArgumentException("check HBase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
+        }
+
+        return runtimeConfig;
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        LOG.info("start open ...");
+        Configuration config = prepareRuntimeConfiguration();
+        CompletableFuture<AsyncConnection> asyncConnectionFuture = ConnectionFactory.createAsyncConnection(config);
+        try {
+            asyncConnection = asyncConnectionFuture.get();
+            table = asyncConnection.getTable(TableName.valueOf(hTableName), (ExecutorService) Executors.directExecutor());
+
+            this.cache = cacheMaxSize == -1 || cacheExpireMs == 0 ? null : CacheBuilder.newBuilder()
+                    .recordStats()
+                    .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+                    .maximumSize(cacheMaxSize)
+                    .build();
+            if (cache != null) {
+                context.getMetricGroup().gauge("lookupCacheHitRate", (Gauge<Double>) () -> cache.stats().hitRate());
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Exception while creating connection to HBase.", e);
+            throw new RuntimeException("Cannot create connection to HBase.", e);
+        }
+        this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
+        LOG.info("end open.");
+    }
+
+    @Override
+    public void close() {
+        LOG.info("start close ...");
+        if (null != asyncConnection) {
+            try {
+                asyncConnection.close();
+                asyncConnection = null;
+            } catch (IOException e) {
+                // ignore exception when close.
+                LOG.warn("exception when close connection", e);
+            }
+        }
+        LOG.info("end close.");
+    }
+
+    @VisibleForTesting
+    public String getHTableName() {
+        return hTableName;
+    }
+
+

Review comment:
       redundant blank line

##########
File path: docs/dev/table/connectors/hbase.md
##########
@@ -172,6 +172,34 @@ Connector Options
       <td>Integer</td>
       <td>Defines the parallelism of the HBase sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td>
     </tr>
+    <tr>
+      <td><h5>lookup.async</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Whether async lookup are supported.If true,the lookup will be async.Note,async only supports hbase-2.2 connector.</td>

Review comment:
       > async only supports hbase-2.2 connector
   
   It will be great if we can throw exception message in ` hbase-1.4` connector if user try to use this feature

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    /**
+     * The invoke entry point of lookup function.
+     * @param feature The result or exception is returned.
+     * @param rowKey the lookup key. Currently only support single rowkey.
+     */
+    public void eval(CompletableFuture<Collection<RowData>> feature, Object rowKey) {
+        int currentRetry = 0;
+        if (cache != null){
+            RowData cacheRowData = cache.getIfPresent(rowKey);
+            if (cacheRowData  != null){
+                if (cacheRowData.getArity() == 0){
+                    feature.complete(Collections.emptyList());
+                } else {
+                    feature.complete(Collections.singletonList(cacheRowData));
+                }
+                return;
+            }
+        }
+        // fetch result
+        fetchResult(feature, currentRetry, rowKey);
+    }
+
+    /**
+     * Execute async fetch result .
+     * @param feature The result or exception is returned.
+     * @param currentRetry Current number of retries.
+     * @param rowKey the lookup key.
+     */
+    private void fetchResult(CompletableFuture<Collection<RowData>> feature, int currentRetry, Object rowKey){
+        Get get = serde.createGet(rowKey);
+        CompletableFuture<Result> resultFuture = table.get(get);
+        resultFuture.whenCompleteAsync(
+            (result, throwable) -> {
+                if (throwable != null) {
+                    if (throwable instanceof TableNotFoundException) {
+                        LOG.error("Table '{}' not found ", hTableName, throwable);
+                        feature.completeExceptionally(
+                            new RuntimeException("HBase table '" + hTableName + "' not found.", throwable));
+                    } else {
+                        LOG.error(String.format("Hbase asyncLookup error, retry times = %d", currentRetry), throwable);
+                        if (currentRetry >= maxRetryTimes) {
+                            feature.completeExceptionally(throwable);
+                        } else {
+                            try {
+                                Thread.sleep(1000 * currentRetry);
+                            } catch (InterruptedException e1) {
+                                feature.completeExceptionally(e1);
+                            }
+                            fetchResult(feature, currentRetry + 1, rowKey);
+                        }
+                    }
+                } else {
+                    boolean flag = result.isEmpty();
+                    if (flag) {

Review comment:
                           if (result.isEmpty()) {
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r579195220



##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseOptions.java
##########
@@ -94,6 +94,33 @@
                                     + "Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
                                     + "can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.");
 
+    public static final ConfigOption<Boolean> LOOKUP_ASYNC =
+        ConfigOptions.key("lookup.async")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("whether to set async lookup.");
+
+    public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
+        ConfigOptions.key("lookup.cache.max-rows")
+            .longType()
+            .defaultValue(-1L)
+            .withDescription(
+                "the max number of rows of lookup cache, over this value, the oldest rows will "
+                    + "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is "
+                    + "specified. Cache is not enabled as default.");
+
+    public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
+        ConfigOptions.key("lookup.cache.ttl")
+            .durationType()
+            .defaultValue(Duration.ofSeconds(0))
+            .withDescription("the cache time to live.");
+
+    public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
+        ConfigOptions.key("lookup.max-retries")
+            .intType()
+            .defaultValue(3)
+            .withDescription("the max retry times if lookup database failed.");

Review comment:
       thanks, I will do that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
leonardBang commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r587506655



##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
##########
@@ -225,6 +217,13 @@ public Get createGet(Object rowKey) {
 
     /** Converts HBase {@link Result} into {@link RowData}. */
     public RowData convertToRow(Result result) {
+        // The output rows needs to be initialized each time

Review comment:
       Both above ideas make sense to me,  just choice your favor




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r589323153



##########
File path: flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.connector.hbase.util.PlannerType;
+import org.apache.flink.connector.hbase2.util.HBaseTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
+public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {

Review comment:
       > Could we add an ITCase for `AsyncLookupFunction` like `HBaseConnectorITCase.testHBaseLookupTableSource` ?
   
   @leonardBang  I tried to run HBaseConnectorITCase.testHBaseLookupTableSource, but an exception was raised. Is this a bug?
   Detailed exception information:
   `java.lang.NoClassDefFoundError: com.google.common.base.MoreObjects
   	at org.apache.calcite.config.CalciteSystemProperty.loadProperties(CalciteSystemProperty.java:404)
   	at org.apache.calcite.config.CalciteSystemProperty.<clinit>(CalciteSystemProperty.java:47)
   	at org.apache.calcite.util.Util.<clinit>(Util.java:152)
   	at org.apache.calcite.sql.type.SqlTypeName.<clinit>(SqlTypeName.java:142)
   	at org.apache.calcite.sql.type.SqlTypeFamily.getTypeNames(SqlTypeFamily.java:163)
   	at org.apache.calcite.sql.type.ReturnTypes.<clinit>(ReturnTypes.java:127)
   	at org.apache.calcite.sql.SqlSetOperator.<init>(SqlSetOperator.java:45)
   	at org.apache.calcite.sql.fun.SqlStdOperatorTable.<clinit>(SqlStdOperatorTable.java:97)
   	at org.apache.calcite.sql2rel.StandardConvertletTable.<init>(StandardConvertletTable.java:101)
   	at org.apache.calcite.sql2rel.StandardConvertletTable.<clinit>(StandardConvertletTable.java:91)
   	at org.apache.calcite.tools.Frameworks$ConfigBuilder.<init>(Frameworks.java:234)
   	at org.apache.calcite.tools.Frameworks$ConfigBuilder.<init>(Frameworks.java:215)
   	at org.apache.calcite.tools.Frameworks.newConfigBuilder(Frameworks.java:199)
   	at org.apache.flink.table.planner.delegation.PlannerContext.createFrameworkConfig(PlannerContext.java:135)
   	at org.apache.flink.table.planner.delegation.PlannerContext.<init>(PlannerContext.java:115)
   	at org.apache.flink.table.planner.delegation.PlannerBase.<init>(PlannerBase.scala:113)
   	at org.apache.flink.table.planner.delegation.StreamPlanner.<init>(StreamPlanner.scala:49)
   	at org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:48)
   	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:143)
   	at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:113)
   	at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:85)`
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-798913741


   > I have rebased and squashed the branch to trigger the build again. Will merge it once build is passed.
   
   Thank you very mach!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r578888288



##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseOptions.java
##########
@@ -94,6 +94,33 @@
                                     + "Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
                                     + "can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.");
 
+    public static final ConfigOption<Boolean> LOOKUP_ASYNC =
+        ConfigOptions.key("lookup.async")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("whether to set async lookup.");
+
+    public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
+        ConfigOptions.key("lookup.cache.max-rows")
+            .longType()
+            .defaultValue(-1L)
+            .withDescription(
+                "the max number of rows of lookup cache, over this value, the oldest rows will "
+                    + "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is "
+                    + "specified. Cache is not enabled as default.");
+
+    public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
+        ConfigOptions.key("lookup.cache.ttl")
+            .durationType()
+            .defaultValue(Duration.ofSeconds(0))
+            .withDescription("the cache time to live.");
+
+    public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
+        ConfigOptions.key("lookup.max-retries")
+            .intType()
+            .defaultValue(3)
+            .withDescription("the max retry times if lookup database failed.");

Review comment:
       @leonardBang hello ,Can I support cache for HBaseRowDataLookupFunction in the current patch?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r571761491



##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseOptions.java
##########
@@ -94,6 +94,33 @@
                                     + "Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
                                     + "can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.");
 
+    public static final ConfigOption<Boolean> LOOKUP_ASYNC =
+        ConfigOptions.key("lookup.async")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("whether to set async lookup.");
+
+    public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
+        ConfigOptions.key("lookup.cache.max-rows")
+            .longType()
+            .defaultValue(-1L)
+            .withDescription(
+                "the max number of rows of lookup cache, over this value, the oldest rows will "
+                    + "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is "
+                    + "specified. Cache is not enabled as default.");
+
+    public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
+        ConfigOptions.key("lookup.cache.ttl")
+            .durationType()
+            .defaultValue(Duration.ofSeconds(0))
+            .withDescription("the cache time to live.");
+
+    public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
+        ConfigOptions.key("lookup.max-retries")
+            .intType()
+            .defaultValue(3)
+            .withDescription("the max retry times if lookup database failed.");

Review comment:
       Good idea, Can I support cache for HBaseRowDataLookupFunction in the current patch?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
wuchong commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762234588


   @anlen321  could you rebase your branch to the lastest master to resolve the conflicts?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13653",
       "triggerID" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "triggerType" : "PUSH"
     }, {
       "hash" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13882",
       "triggerID" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14352",
       "triggerID" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0375eb67f1ccb8460862baea733795874ba102f3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14352) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13653",
       "triggerID" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "triggerType" : "PUSH"
     }, {
       "hash" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13882",
       "triggerID" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14352",
       "triggerID" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c34d423032cb900ffad161afdb7f0b14fe7a824a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c34d423032cb900ffad161afdb7f0b14fe7a824a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0375eb67f1ccb8460862baea733795874ba102f3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14352) 
   * c34d423032cb900ffad161afdb7f0b14fe7a824a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13653",
       "triggerID" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f26efadf325bb52d3bc8b2711fc83a169ad61ca5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409) 
   * c7aa55d73e52cd7e010fb5370bdc4f3cf9833133 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13653) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5ddc226c2aa4fd063cc173a65eb887f9ac0247ba Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192) 
   * f26efadf325bb52d3bc8b2711fc83a169ad61ca5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 closed pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 closed pull request #14684:
URL: https://github.com/apache/flink/pull/14684


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
leonardBang commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r589909417



##########
File path: flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.connector.hbase.util.PlannerType;
+import org.apache.flink.connector.hbase2.util.HBaseTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
+public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {

Review comment:
       HBaseConnectorITCase can not run in IDE directly because of calss conflicts ,we can use `mvn` to test it, try:
   ```cd flink-connector-hbase-2.2```
   ```mvn test -Dtest=org.apache.flink.connector.hbase.HBaseConnectorITCase -Dcheckstyle.skip=true```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r592279270



##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        LOG.info("start open ...");
+        Configuration config = prepareRuntimeConfiguration();
+        CompletableFuture<AsyncConnection> asyncConnectionFuture = ConnectionFactory.createAsyncConnection(config);
+        try {
+            asyncConnection = asyncConnectionFuture.get();
+            table = asyncConnection.getTable(TableName.valueOf(hTableName), (ExecutorService) Executors.directExecutor());
+
+            this.cache = cacheMaxSize == -1 || cacheExpireMs == 0 ? null : CacheBuilder.newBuilder()
+                .recordStats()
+                .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+                .maximumSize(cacheMaxSize)
+                .build();
+            if (cache != null && context != null) {
+                context.getMetricGroup().gauge("lookupCacheHitRate", (Gauge<Double>) () -> cache.stats().hitRate());
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Exception while creating connection to HBase.", e);
+            throw new RuntimeException("Cannot create connection to HBase.", e);
+        }
+        this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
+        LOG.info("end open.");
+    }
+
+    /**
+     * The invoke entry point of lookup function.
+     * @param feature The result or exception is returned.
+     * @param rowKey the lookup key. Currently only support single rowkey.
+     */
+    public void eval(CompletableFuture<Collection<RowData>> feature, Object rowKey) {
+        int currentRetry = 0;
+        if (cache != null){
+            RowData cacheRowData = cache.getIfPresent(rowKey);
+            if (cacheRowData  != null){
+                if (cacheRowData.getArity() == 0){
+                    feature.complete(Collections.emptyList());
+                } else {
+                    feature.complete(Collections.singletonList(cacheRowData));
+                }
+                return;
+            }
+        }
+        // fetch result
+        fetchResult(feature, currentRetry, rowKey);
+    }
+
+    /**
+     * Execute async fetch result .
+     * @param resultFuture The result or exception is returned.
+     * @param currentRetry Current number of retries.
+     * @param rowKey the lookup key.
+     */
+    private void fetchResult(CompletableFuture<Collection<RowData>> resultFuture, int currentRetry, Object rowKey){
+        Get get = serde.createGet(rowKey);
+        CompletableFuture<Result> responseFuture = table.get(get);
+        responseFuture.whenCompleteAsync(
+            (result, throwable) -> {
+                if (throwable != null) {
+                    if (throwable instanceof TableNotFoundException) {
+                        LOG.error("Table '{}' not found ", hTableName, throwable);
+                        resultFuture.completeExceptionally(
+                            new RuntimeException("HBase table '" + hTableName + "' not found.", throwable));
+                    } else {
+                        LOG.error(String.format("HBase asyncLookup error, retry times = %d", currentRetry), throwable);
+                        if (currentRetry >= maxRetryTimes) {
+                            resultFuture.completeExceptionally(throwable);
+                        } else {
+                            try {
+                                Thread.sleep(1000 * currentRetry);
+                            } catch (InterruptedException e1) {
+                                resultFuture.completeExceptionally(e1);
+                            }
+                            fetchResult(resultFuture, currentRetry + 1, rowKey);
+                        }
+                    }
+                } else {
+                    if (result.isEmpty()) {
+                        resultFuture.complete(Collections.emptyList());
+                        if (cache != null) {
+                            cache.put(rowKey, new GenericRowData(0));
+                        }
+                    } else {
+                        if (cache != null){
+                            RowData rowData = serde.convertToRow(result, false);
+                            resultFuture.complete(Collections.singletonList(rowData));
+                            cache.put(rowKey, rowData);
+                        } else {
+                            resultFuture.complete(Collections.singletonList(serde.convertToRow(result, true)));
+                        }
+                    }
+                }
+            });
+    }
+
+    private Configuration prepareRuntimeConfiguration() {
+        // create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
+        // and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
+        // user params from client-side have the highest priority
+        Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(
+                serializedConfig,
+                HBaseConfigurationUtil.getHBaseConfiguration());
+
+        // do validation: check key option(s) in final runtime configuration
+        if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
+            LOG.error("can not connect to HBase without {} configuration", HConstants.ZOOKEEPER_QUORUM);
+            throw new IllegalArgumentException("check HBase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
+        }
+
+        return runtimeConfig;
+    }
+
+    @Override
+    public void close() {
+        LOG.info("start close ...");
+        if (null != asyncConnection) {

Review comment:
       I will close it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
leonardBang commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r587208036



##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
##########
@@ -225,6 +217,13 @@ public Get createGet(Object rowKey) {
 
     /** Converts HBase {@link Result} into {@link RowData}. */
     public RowData convertToRow(Result result) {
+        // The output rows needs to be initialized each time

Review comment:
       Could we add a `reuse` flag for this function so that we can improve the performance when user didn't enable the cache or used in the HBaseRowDataInputFormat 

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.connector.hbase.util.PlannerType;
+import org.apache.flink.connector.hbase2.util.HBaseTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
+public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {

Review comment:
       Could we add an ITCase for `AsyncLookupFunction`  like  `HBaseConnectorITCase.testHBaseLookupTableSource` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a33cf2c3a6e8d773eea6746e2c5adc2b3093a047 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181) 
   * 5ddc226c2aa4fd063cc173a65eb887f9ac0247ba UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r590317782



##########
File path: flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.connector.hbase.util.PlannerType;
+import org.apache.flink.connector.hbase2.util.HBaseTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
+public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {

Review comment:
       Thank you very much!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a33cf2c3a6e8d773eea6746e2c5adc2b3093a047 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5ddc226c2aa4fd063cc173a65eb887f9ac0247ba Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192) 
   * f26efadf325bb52d3bc8b2711fc83a169ad61ca5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r592279152



##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        LOG.info("start open ...");
+        Configuration config = prepareRuntimeConfiguration();
+        CompletableFuture<AsyncConnection> asyncConnectionFuture = ConnectionFactory.createAsyncConnection(config);
+        try {
+            asyncConnection = asyncConnectionFuture.get();
+            table = asyncConnection.getTable(TableName.valueOf(hTableName), (ExecutorService) Executors.directExecutor());

Review comment:
       The CPU cores may very large depends on the matchine. I would suggest to use a hard code 16 as the example. We can parameterize it in the future if needed. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
leonardBang commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r579017817



##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseOptions.java
##########
@@ -94,6 +94,33 @@
                                     + "Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
                                     + "can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.");
 
+    public static final ConfigOption<Boolean> LOOKUP_ASYNC =
+        ConfigOptions.key("lookup.async")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("whether to set async lookup.");
+
+    public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
+        ConfigOptions.key("lookup.cache.max-rows")
+            .longType()
+            .defaultValue(-1L)
+            .withDescription(
+                "the max number of rows of lookup cache, over this value, the oldest rows will "
+                    + "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is "
+                    + "specified. Cache is not enabled as default.");
+
+    public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
+        ConfigOptions.key("lookup.cache.ttl")
+            .durationType()
+            .defaultValue(Duration.ofSeconds(0))
+            .withDescription("the cache time to live.");
+
+    public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
+        ConfigOptions.key("lookup.max-retries")
+            .intType()
+            .defaultValue(3)
+            .withDescription("the max retry times if lookup database failed.");

Review comment:
       I think we could, we can put them in two commits, one to support cache, the other to support `HBaseRowDataLookupFunction`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5ddc226c2aa4fd063cc173a65eb887f9ac0247ba Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13653",
       "triggerID" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "triggerType" : "PUSH"
     }, {
       "hash" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13882",
       "triggerID" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14352",
       "triggerID" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c34d423032cb900ffad161afdb7f0b14fe7a824a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14610",
       "triggerID" : "c34d423032cb900ffad161afdb7f0b14fe7a824a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c34d423032cb900ffad161afdb7f0b14fe7a824a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14610) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-793815842


   Hi @leonardBang Thanks for your some comments,I've solved it. could you help to review this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13653",
       "triggerID" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "triggerType" : "PUSH"
     }, {
       "hash" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c7aa55d73e52cd7e010fb5370bdc4f3cf9833133 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13653) 
   * 063d767c14cfdff548b7eb428cae3a6a9f9e236e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13653",
       "triggerID" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c7aa55d73e52cd7e010fb5370bdc4f3cf9833133 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13653) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762209126


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit a33cf2c3a6e8d773eea6746e2c5adc2b3093a047 (Mon Jan 18 12:08:07 UTC 2021)
   
    ✅no warnings
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r592276055



##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        LOG.info("start open ...");
+        Configuration config = prepareRuntimeConfiguration();
+        CompletableFuture<AsyncConnection> asyncConnectionFuture = ConnectionFactory.createAsyncConnection(config);
+        try {
+            asyncConnection = asyncConnectionFuture.get();
+            table = asyncConnection.getTable(TableName.valueOf(hTableName), (ExecutorService) Executors.directExecutor());

Review comment:
       @wuchong Thanks for your comments,I quite agree with you. but how many threads do we need to define?
   I think it is a IO job, we can set threadNum = CPU cores * 2




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13653",
       "triggerID" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "triggerType" : "PUSH"
     }, {
       "hash" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13882",
       "triggerID" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14352",
       "triggerID" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 063d767c14cfdff548b7eb428cae3a6a9f9e236e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13882) 
   * 0375eb67f1ccb8460862baea733795874ba102f3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14352) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-766091231


   > I'll take a look this PR, @wuchong .
   > 
   > @anlen321 Looks like you fixed the conflicts with `git merge` command, it's recommended to use `git rebase master` command to fix the conflicts, could you move your change to single commit and then we will help review? Currently your two commits are messed up with others', it's hard to read.
   > ![image](https://user-images.githubusercontent.com/5163645/105036057-ea740b80-5a96-11eb-858f-23f488fa80ef.png)
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13653",
       "triggerID" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "triggerType" : "PUSH"
     }, {
       "hash" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13882",
       "triggerID" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14352",
       "triggerID" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c34d423032cb900ffad161afdb7f0b14fe7a824a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14610",
       "triggerID" : "c34d423032cb900ffad161afdb7f0b14fe7a824a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ebebb69b4975c436a8980b5052a0127e5806b94",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14625",
       "triggerID" : "3ebebb69b4975c436a8980b5052a0127e5806b94",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c34d423032cb900ffad161afdb7f0b14fe7a824a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14610) 
   * 3ebebb69b4975c436a8980b5052a0127e5806b94 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14625) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13653",
       "triggerID" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "triggerType" : "PUSH"
     }, {
       "hash" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13882",
       "triggerID" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14352",
       "triggerID" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c34d423032cb900ffad161afdb7f0b14fe7a824a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14610",
       "triggerID" : "c34d423032cb900ffad161afdb7f0b14fe7a824a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ebebb69b4975c436a8980b5052a0127e5806b94",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14625",
       "triggerID" : "3ebebb69b4975c436a8980b5052a0127e5806b94",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ebebb69b4975c436a8980b5052a0127e5806b94 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14625) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
wuchong commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762573217


   Hi @leonardBang  could you help to review this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
wuchong commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-798912565


   I have rebased and squashed the branch to trigger the build again. Will merge it once build is passed. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
wuchong commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-799044635


   The failed test is not related to this PR. Will merge it. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-798900149


   Hi @wuchong Thanks for your some comments,I've solved it. could you help to review this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-763700505


   > > @leonardBang My current approach to conflict is not friendly and resulted in the inclusion of other commits. So I want to create a new branch and apply for a new pull request. Do you think it is OK?
   > 
   > You don't need to raise up a new PR, you can prepare your new branch(e.g: `branch_a`) which based on latest master and appended with a single commit that contains your change, and then (1) delete the `anlen321:FLINK-20460` in your local `git branch -D FLINK-20460`, (3) checkout a new branch named `FLINK-20460` from the `branch_a`,(4) push the `FLINK-20460` branch to remote `git push origin FLINK-20460`, the PR will automatically update
   
   Thank you!This method sounds good, I will follow your way to do.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a33cf2c3a6e8d773eea6746e2c5adc2b3093a047 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-770858312


    @wuchong@leonardBang could you help me to review this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 removed a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 removed a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-766091231


   > I'll take a look this PR, @wuchong .
   > 
   > @anlen321 Looks like you fixed the conflicts with `git merge` command, it's recommended to use `git rebase master` command to fix the conflicts, could you move your change to single commit and then we will help review? Currently your two commits are messed up with others', it's hard to read.
   > ![image](https://user-images.githubusercontent.com/5163645/105036057-ea740b80-5a96-11eb-858f-23f488fa80ef.png)
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r587463451



##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
##########
@@ -225,6 +217,13 @@ public Get createGet(Object rowKey) {
 
     /** Converts HBase {@link Result} into {@link RowData}. */
     public RowData convertToRow(Result result) {
+        // The output rows needs to be initialized each time

Review comment:
       @leonardBang  Sounds good. I think if we add a reuse flag for this function, will complicate the method.
   Maybe we could add a method specifically for the cache. Or you have a better idea?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r592324220



##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        LOG.info("start open ...");
+        Configuration config = prepareRuntimeConfiguration();
+        CompletableFuture<AsyncConnection> asyncConnectionFuture = ConnectionFactory.createAsyncConnection(config);
+        try {
+            asyncConnection = asyncConnectionFuture.get();
+            table = asyncConnection.getTable(TableName.valueOf(hTableName), (ExecutorService) Executors.directExecutor());

Review comment:
       Good, I will finish it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
leonardBang commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-763383818


   > @leonardBang My current approach to conflict is not friendly and resulted in the inclusion of other commits. So I want to create a new branch and apply for a new pull request. Do you think it is OK?
   
   You don't need to raise up a new PR, you can prepare your new branch(e.g: `branch_a`) which based on latest master and appended with a single commit that contains your change, and then (1)  delete the `anlen321:FLINK-20460` in your local `git branch -D FLINK-20460`, (3) checkout a new branch named `FLINK-20460` from the `branch_a`,(4) push the  `FLINK-20460` branch to remote `git push origin FLINK-20460`, the PR will automatically update


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-763327328


   
   @leonardBang My current approach to conflict is not friendly and resulted in the inclusion of other commits. So I want to create a new branch and apply for a new pull request. Do you think it is OK?
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-787301167


   Hi @leonardBang  Thanks for your some comments,I've solved it. could you help to review this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
leonardBang commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r570712863



##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
##########
@@ -35,17 +42,45 @@ public HBaseDynamicTableSource(
             Configuration conf,
             String tableName,
             HBaseTableSchema hbaseSchema,
-            String nullStringLiteral) {
-        super(conf, tableName, hbaseSchema, nullStringLiteral);
+            String nullStringLiteral,
+            HBaseLookupOptions lookupOptions) {
+        super(conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
+    }
+
+    @Override
+    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+        checkArgument(context.getKeys().length == 1 && context.getKeys()[0].length == 1,
+            "Currently, HBase table can only be lookup by single rowkey.");
+        checkArgument(
+            hbaseSchema.getRowKeyName().isPresent(),
+            "HBase schema must have a row key when used in lookup mode.");
+        checkArgument(
+            hbaseSchema
+                .convertsToTableSchema()
+                .getTableColumn(context.getKeys()[0][0])
+                .filter(f -> f.getName().equals(hbaseSchema.getRowKeyName().get()))
+                .isPresent(),
+            "Currently, HBase table only supports lookup by rowkey field.");
+        boolean isAsync = lookupOptions.getLookupAsync();
+        if (isAsync){

Review comment:
       ```suggestion
           if (lookupOptions.getLookupAsync()){
   ```

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+

Review comment:
       redundant blank lines

##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseOptions.java
##########
@@ -94,6 +94,33 @@
                                     + "Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
                                     + "can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.");
 
+    public static final ConfigOption<Boolean> LOOKUP_ASYNC =
+        ConfigOptions.key("lookup.async")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("whether to set async lookup.");
+
+    public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
+        ConfigOptions.key("lookup.cache.max-rows")
+            .longType()
+            .defaultValue(-1L)
+            .withDescription(
+                "the max number of rows of lookup cache, over this value, the oldest rows will "
+                    + "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is "
+                    + "specified. Cache is not enabled as default.");
+
+    public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
+        ConfigOptions.key("lookup.cache.ttl")
+            .durationType()
+            .defaultValue(Duration.ofSeconds(0))
+            .withDescription("the cache time to live.");
+
+    public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
+        ConfigOptions.key("lookup.max-retries")
+            .intType()
+            .defaultValue(3)
+            .withDescription("the max retry times if lookup database failed.");

Review comment:
       The three Options should not be related to `LOOKUP_ASYNC`, if we plan to support this, we also need to support cache for `HBaseRowDataLookupFunction`, otherwise this option is invalid if user did not enable `LOOKUP_ASYNC` 

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    /**
+     * The invoke entry point of lookup function.
+     * @param feature The result or exception is returned.
+     * @param rowKey the lookup key. Currently only support single rowkey.
+     */
+    public void eval(CompletableFuture<Collection<RowData>> feature, Object rowKey) {
+        int currentRetry = 0;
+        if (cache != null){
+            RowData cacheRowData = cache.getIfPresent(rowKey);
+            if (cacheRowData  != null){
+                if (cacheRowData.getArity() == 0){
+                    feature.complete(Collections.emptyList());
+                } else {
+                    feature.complete(Collections.singletonList(cacheRowData));
+                }
+                return;
+            }
+        }
+        // fetch result
+        fetchResult(feature, currentRetry, rowKey);
+    }
+
+    /**
+     * Execute async fetch result .
+     * @param feature The result or exception is returned.
+     * @param currentRetry Current number of retries.
+     * @param rowKey the lookup key.
+     */
+    private void fetchResult(CompletableFuture<Collection<RowData>> feature, int currentRetry, Object rowKey){
+        Get get = serde.createGet(rowKey);
+        CompletableFuture<Result> resultFuture = table.get(get);
+        resultFuture.whenCompleteAsync(
+            (result, throwable) -> {
+                if (throwable != null) {
+                    if (throwable instanceof TableNotFoundException) {
+                        LOG.error("Table '{}' not found ", hTableName, throwable);
+                        feature.completeExceptionally(
+                            new RuntimeException("HBase table '" + hTableName + "' not found.", throwable));
+                    } else {
+                        LOG.error(String.format("Hbase asyncLookup error, retry times = %d", currentRetry), throwable);
+                        if (currentRetry >= maxRetryTimes) {
+                            feature.completeExceptionally(throwable);
+                        } else {
+                            try {
+                                Thread.sleep(1000 * currentRetry);
+                            } catch (InterruptedException e1) {
+                                feature.completeExceptionally(e1);
+                            }
+                            fetchResult(feature, currentRetry + 1, rowKey);
+                        }
+                    }
+                } else {
+                    boolean flag = result.isEmpty();
+                    if (flag) {
+                        feature.complete(Collections.emptyList());
+                        if (cache != null) {
+                            cache.put(rowKey, new GenericRowData(0));
+                        }
+                    } else {
+                        GenericRowData rowData = (GenericRowData) serde.convertToRow(result);
+                        feature.complete(Collections.singletonList(rowData));
+                        if (cache != null){
+                            cache.put(rowKey, rowData);
+                        }
+                    }
+                }
+            });
+    }
+
+    private Configuration prepareRuntimeConfiguration() {
+        // create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
+        // and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
+        // user params from client-side have the highest priority
+        Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(
+                serializedConfig,
+                HBaseConfigurationUtil.getHBaseConfiguration());
+
+        // do validation: check key option(s) in final runtime configuration
+        if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
+            LOG.error("can not connect to HBase without {} configuration", HConstants.ZOOKEEPER_QUORUM);
+            throw new IllegalArgumentException("check HBase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
+        }
+
+        return runtimeConfig;
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        LOG.info("start open ...");
+        Configuration config = prepareRuntimeConfiguration();
+        CompletableFuture<AsyncConnection> asyncConnectionFuture = ConnectionFactory.createAsyncConnection(config);
+        try {
+            asyncConnection = asyncConnectionFuture.get();
+            table = asyncConnection.getTable(TableName.valueOf(hTableName), (ExecutorService) Executors.directExecutor());
+
+            this.cache = cacheMaxSize == -1 || cacheExpireMs == 0 ? null : CacheBuilder.newBuilder()
+                    .recordStats()
+                    .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+                    .maximumSize(cacheMaxSize)
+                    .build();
+            if (cache != null) {
+                context.getMetricGroup().gauge("lookupCacheHitRate", (Gauge<Double>) () -> cache.stats().hitRate());
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Exception while creating connection to HBase.", e);
+            throw new RuntimeException("Cannot create connection to HBase.", e);
+        }
+        this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
+        LOG.info("end open.");
+    }
+
+    @Override
+    public void close() {
+        LOG.info("start close ...");
+        if (null != asyncConnection) {
+            try {
+                asyncConnection.close();
+                asyncConnection = null;
+            } catch (IOException e) {
+                // ignore exception when close.
+                LOG.warn("exception when close connection", e);
+            }
+        }
+        LOG.info("end close.");
+    }
+
+    @VisibleForTesting
+    public String getHTableName() {
+        return hTableName;
+    }
+
+
+}

Review comment:
       I think we should add unit test for this class just like `JdbcRowDataLookupFunctionTest` and a ITCase in HBaseConnectorITCase as well

##########
File path: docs/dev/table/connectors/hbase.md
##########
@@ -172,6 +172,34 @@ Connector Options
       <td>Integer</td>
       <td>Defines the parallelism of the HBase sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td>
     </tr>
+    <tr>
+      <td><h5>lookup.async</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Whether async lookup are supported.If true,the lookup will be async.Note,async only supports hbase-2.2 connector.</td>

Review comment:
       Please note the doc format like comma/point should follow a space, you can reference the JDBC doc page 

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    /**
+     * The invoke entry point of lookup function.
+     * @param feature The result or exception is returned.
+     * @param rowKey the lookup key. Currently only support single rowkey.
+     */
+    public void eval(CompletableFuture<Collection<RowData>> feature, Object rowKey) {
+        int currentRetry = 0;
+        if (cache != null){
+            RowData cacheRowData = cache.getIfPresent(rowKey);
+            if (cacheRowData  != null){
+                if (cacheRowData.getArity() == 0){
+                    feature.complete(Collections.emptyList());
+                } else {
+                    feature.complete(Collections.singletonList(cacheRowData));
+                }
+                return;
+            }
+        }
+        // fetch result
+        fetchResult(feature, currentRetry, rowKey);
+    }
+
+    /**
+     * Execute async fetch result .
+     * @param feature The result or exception is returned.
+     * @param currentRetry Current number of retries.
+     * @param rowKey the lookup key.
+     */
+    private void fetchResult(CompletableFuture<Collection<RowData>> feature, int currentRetry, Object rowKey){
+        Get get = serde.createGet(rowKey);
+        CompletableFuture<Result> resultFuture = table.get(get);
+        resultFuture.whenCompleteAsync(
+            (result, throwable) -> {
+                if (throwable != null) {
+                    if (throwable instanceof TableNotFoundException) {
+                        LOG.error("Table '{}' not found ", hTableName, throwable);
+                        feature.completeExceptionally(
+                            new RuntimeException("HBase table '" + hTableName + "' not found.", throwable));
+                    } else {
+                        LOG.error(String.format("Hbase asyncLookup error, retry times = %d", currentRetry), throwable);
+                        if (currentRetry >= maxRetryTimes) {
+                            feature.completeExceptionally(throwable);
+                        } else {
+                            try {
+                                Thread.sleep(1000 * currentRetry);
+                            } catch (InterruptedException e1) {
+                                feature.completeExceptionally(e1);
+                            }
+                            fetchResult(feature, currentRetry + 1, rowKey);
+                        }
+                    }
+                } else {
+                    boolean flag = result.isEmpty();
+                    if (flag) {
+                        feature.complete(Collections.emptyList());
+                        if (cache != null) {
+                            cache.put(rowKey, new GenericRowData(0));
+                        }
+                    } else {
+                        GenericRowData rowData = (GenericRowData) serde.convertToRow(result);
+                        feature.complete(Collections.singletonList(rowData));
+                        if (cache != null){
+                            cache.put(rowKey, rowData);
+                        }
+                    }
+                }
+            });
+    }
+
+    private Configuration prepareRuntimeConfiguration() {
+        // create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
+        // and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
+        // user params from client-side have the highest priority
+        Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(
+                serializedConfig,
+                HBaseConfigurationUtil.getHBaseConfiguration());
+
+        // do validation: check key option(s) in final runtime configuration
+        if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
+            LOG.error("can not connect to HBase without {} configuration", HConstants.ZOOKEEPER_QUORUM);
+            throw new IllegalArgumentException("check HBase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
+        }
+
+        return runtimeConfig;
+    }
+
+    @Override
+    public void open(FunctionContext context) {

Review comment:
       we can keep the function order with parent to make the code more readable

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    /**
+     * The invoke entry point of lookup function.
+     * @param feature The result or exception is returned.
+     * @param rowKey the lookup key. Currently only support single rowkey.
+     */
+    public void eval(CompletableFuture<Collection<RowData>> feature, Object rowKey) {
+        int currentRetry = 0;
+        if (cache != null){
+            RowData cacheRowData = cache.getIfPresent(rowKey);
+            if (cacheRowData  != null){
+                if (cacheRowData.getArity() == 0){
+                    feature.complete(Collections.emptyList());
+                } else {
+                    feature.complete(Collections.singletonList(cacheRowData));
+                }
+                return;
+            }
+        }
+        // fetch result
+        fetchResult(feature, currentRetry, rowKey);
+    }
+
+    /**
+     * Execute async fetch result .
+     * @param feature The result or exception is returned.
+     * @param currentRetry Current number of retries.
+     * @param rowKey the lookup key.
+     */
+    private void fetchResult(CompletableFuture<Collection<RowData>> feature, int currentRetry, Object rowKey){

Review comment:
       feature -> resultFuture

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    /**
+     * The invoke entry point of lookup function.
+     * @param feature The result or exception is returned.
+     * @param rowKey the lookup key. Currently only support single rowkey.
+     */
+    public void eval(CompletableFuture<Collection<RowData>> feature, Object rowKey) {
+        int currentRetry = 0;
+        if (cache != null){
+            RowData cacheRowData = cache.getIfPresent(rowKey);
+            if (cacheRowData  != null){
+                if (cacheRowData.getArity() == 0){
+                    feature.complete(Collections.emptyList());
+                } else {
+                    feature.complete(Collections.singletonList(cacheRowData));
+                }
+                return;
+            }
+        }
+        // fetch result
+        fetchResult(feature, currentRetry, rowKey);
+    }
+
+    /**
+     * Execute async fetch result .
+     * @param feature The result or exception is returned.
+     * @param currentRetry Current number of retries.
+     * @param rowKey the lookup key.
+     */
+    private void fetchResult(CompletableFuture<Collection<RowData>> feature, int currentRetry, Object rowKey){
+        Get get = serde.createGet(rowKey);
+        CompletableFuture<Result> resultFuture = table.get(get);
+        resultFuture.whenCompleteAsync(
+            (result, throwable) -> {
+                if (throwable != null) {
+                    if (throwable instanceof TableNotFoundException) {
+                        LOG.error("Table '{}' not found ", hTableName, throwable);
+                        feature.completeExceptionally(
+                            new RuntimeException("HBase table '" + hTableName + "' not found.", throwable));
+                    } else {
+                        LOG.error(String.format("Hbase asyncLookup error, retry times = %d", currentRetry), throwable);
+                        if (currentRetry >= maxRetryTimes) {
+                            feature.completeExceptionally(throwable);
+                        } else {
+                            try {
+                                Thread.sleep(1000 * currentRetry);
+                            } catch (InterruptedException e1) {
+                                feature.completeExceptionally(e1);
+                            }
+                            fetchResult(feature, currentRetry + 1, rowKey);
+                        }
+                    }
+                } else {
+                    boolean flag = result.isEmpty();
+                    if (flag) {
+                        feature.complete(Collections.emptyList());
+                        if (cache != null) {
+                            cache.put(rowKey, new GenericRowData(0));
+                        }
+                    } else {
+                        GenericRowData rowData = (GenericRowData) serde.convertToRow(result);
+                        feature.complete(Collections.singletonList(rowData));
+                        if (cache != null){
+                            cache.put(rowKey, rowData);
+                        }
+                    }
+                }
+            });
+    }
+
+    private Configuration prepareRuntimeConfiguration() {
+        // create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
+        // and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
+        // user params from client-side have the highest priority
+        Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(
+                serializedConfig,
+                HBaseConfigurationUtil.getHBaseConfiguration());
+
+        // do validation: check key option(s) in final runtime configuration
+        if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
+            LOG.error("can not connect to HBase without {} configuration", HConstants.ZOOKEEPER_QUORUM);
+            throw new IllegalArgumentException("check HBase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
+        }
+
+        return runtimeConfig;
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        LOG.info("start open ...");
+        Configuration config = prepareRuntimeConfiguration();
+        CompletableFuture<AsyncConnection> asyncConnectionFuture = ConnectionFactory.createAsyncConnection(config);
+        try {
+            asyncConnection = asyncConnectionFuture.get();
+            table = asyncConnection.getTable(TableName.valueOf(hTableName), (ExecutorService) Executors.directExecutor());
+
+            this.cache = cacheMaxSize == -1 || cacheExpireMs == 0 ? null : CacheBuilder.newBuilder()
+                    .recordStats()
+                    .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+                    .maximumSize(cacheMaxSize)
+                    .build();
+            if (cache != null) {
+                context.getMetricGroup().gauge("lookupCacheHitRate", (Gauge<Double>) () -> cache.stats().hitRate());
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Exception while creating connection to HBase.", e);
+            throw new RuntimeException("Cannot create connection to HBase.", e);
+        }
+        this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
+        LOG.info("end open.");
+    }
+
+    @Override
+    public void close() {
+        LOG.info("start close ...");
+        if (null != asyncConnection) {
+            try {
+                asyncConnection.close();
+                asyncConnection = null;
+            } catch (IOException e) {
+                // ignore exception when close.
+                LOG.warn("exception when close connection", e);
+            }
+        }
+        LOG.info("end close.");
+    }
+
+    @VisibleForTesting
+    public String getHTableName() {
+        return hTableName;
+    }
+
+

Review comment:
       redundant blank line

##########
File path: docs/dev/table/connectors/hbase.md
##########
@@ -172,6 +172,34 @@ Connector Options
       <td>Integer</td>
       <td>Defines the parallelism of the HBase sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td>
     </tr>
+    <tr>
+      <td><h5>lookup.async</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Whether async lookup are supported.If true,the lookup will be async.Note,async only supports hbase-2.2 connector.</td>

Review comment:
       > async only supports hbase-2.2 connector
   
   It will be great if we can throw exception message in ` hbase-1.4` connector if user try to use this feature

##########
File path: flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    /**
+     * The invoke entry point of lookup function.
+     * @param feature The result or exception is returned.
+     * @param rowKey the lookup key. Currently only support single rowkey.
+     */
+    public void eval(CompletableFuture<Collection<RowData>> feature, Object rowKey) {
+        int currentRetry = 0;
+        if (cache != null){
+            RowData cacheRowData = cache.getIfPresent(rowKey);
+            if (cacheRowData  != null){
+                if (cacheRowData.getArity() == 0){
+                    feature.complete(Collections.emptyList());
+                } else {
+                    feature.complete(Collections.singletonList(cacheRowData));
+                }
+                return;
+            }
+        }
+        // fetch result
+        fetchResult(feature, currentRetry, rowKey);
+    }
+
+    /**
+     * Execute async fetch result .
+     * @param feature The result or exception is returned.
+     * @param currentRetry Current number of retries.
+     * @param rowKey the lookup key.
+     */
+    private void fetchResult(CompletableFuture<Collection<RowData>> feature, int currentRetry, Object rowKey){
+        Get get = serde.createGet(rowKey);
+        CompletableFuture<Result> resultFuture = table.get(get);
+        resultFuture.whenCompleteAsync(
+            (result, throwable) -> {
+                if (throwable != null) {
+                    if (throwable instanceof TableNotFoundException) {
+                        LOG.error("Table '{}' not found ", hTableName, throwable);
+                        feature.completeExceptionally(
+                            new RuntimeException("HBase table '" + hTableName + "' not found.", throwable));
+                    } else {
+                        LOG.error(String.format("Hbase asyncLookup error, retry times = %d", currentRetry), throwable);
+                        if (currentRetry >= maxRetryTimes) {
+                            feature.completeExceptionally(throwable);
+                        } else {
+                            try {
+                                Thread.sleep(1000 * currentRetry);
+                            } catch (InterruptedException e1) {
+                                feature.completeExceptionally(e1);
+                            }
+                            fetchResult(feature, currentRetry + 1, rowKey);
+                        }
+                    }
+                } else {
+                    boolean flag = result.isEmpty();
+                    if (flag) {

Review comment:
                           if (result.isEmpty()) {
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14684:
URL: https://github.com/apache/flink/pull/14684#issuecomment-762224447


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12181",
       "triggerID" : "a33cf2c3a6e8d773eea6746e2c5adc2b3093a047",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12192",
       "triggerID" : "5ddc226c2aa4fd063cc173a65eb887f9ac0247ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12409",
       "triggerID" : "f26efadf325bb52d3bc8b2711fc83a169ad61ca5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13653",
       "triggerID" : "c7aa55d73e52cd7e010fb5370bdc4f3cf9833133",
       "triggerType" : "PUSH"
     }, {
       "hash" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13882",
       "triggerID" : "063d767c14cfdff548b7eb428cae3a6a9f9e236e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14352",
       "triggerID" : "0375eb67f1ccb8460862baea733795874ba102f3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c34d423032cb900ffad161afdb7f0b14fe7a824a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14610",
       "triggerID" : "c34d423032cb900ffad161afdb7f0b14fe7a824a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0375eb67f1ccb8460862baea733795874ba102f3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14352) 
   * c34d423032cb900ffad161afdb7f0b14fe7a824a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=14610) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

Posted by GitBox <gi...@apache.org>.
anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r589323153



##########
File path: flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.connector.hbase.util.PlannerType;
+import org.apache.flink.connector.hbase2.util.HBaseTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
+public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {

Review comment:
       > Could we add an ITCase for `AsyncLookupFunction` like `HBaseConnectorITCase.testHBaseLookupTableSource` ?
   
   @leonardBang  I tried to run this method, but an exception was raised. Is this a bug?
   Detailed exception information:
   `java.lang.NoClassDefFoundError: com.google.common.base.MoreObjects
   	at org.apache.calcite.config.CalciteSystemProperty.loadProperties(CalciteSystemProperty.java:404)
   	at org.apache.calcite.config.CalciteSystemProperty.<clinit>(CalciteSystemProperty.java:47)
   	at org.apache.calcite.util.Util.<clinit>(Util.java:152)
   	at org.apache.calcite.sql.type.SqlTypeName.<clinit>(SqlTypeName.java:142)
   	at org.apache.calcite.sql.type.SqlTypeFamily.getTypeNames(SqlTypeFamily.java:163)
   	at org.apache.calcite.sql.type.ReturnTypes.<clinit>(ReturnTypes.java:127)
   	at org.apache.calcite.sql.SqlSetOperator.<init>(SqlSetOperator.java:45)
   	at org.apache.calcite.sql.fun.SqlStdOperatorTable.<clinit>(SqlStdOperatorTable.java:97)
   	at org.apache.calcite.sql2rel.StandardConvertletTable.<init>(StandardConvertletTable.java:101)
   	at org.apache.calcite.sql2rel.StandardConvertletTable.<clinit>(StandardConvertletTable.java:91)
   	at org.apache.calcite.tools.Frameworks$ConfigBuilder.<init>(Frameworks.java:234)
   	at org.apache.calcite.tools.Frameworks$ConfigBuilder.<init>(Frameworks.java:215)
   	at org.apache.calcite.tools.Frameworks.newConfigBuilder(Frameworks.java:199)
   	at org.apache.flink.table.planner.delegation.PlannerContext.createFrameworkConfig(PlannerContext.java:135)
   	at org.apache.flink.table.planner.delegation.PlannerContext.<init>(PlannerContext.java:115)
   	at org.apache.flink.table.planner.delegation.PlannerBase.<init>(PlannerBase.scala:113)
   	at org.apache.flink.table.planner.delegation.StreamPlanner.<init>(StreamPlanner.scala:49)
   	at org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:48)
   	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:143)
   	at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:113)
   	at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:85)`
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org