You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/10 04:33:40 UTC

[flink] branch master updated: [FLINK-12955][hbase] Support LookupableTableSource for HBase

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a1636b1  [FLINK-12955][hbase] Support LookupableTableSource for HBase
a1636b1 is described below

commit a1636b1e3b05fb366a9e25a23b555648860d9223
Author: qiangsi.lq <qi...@alibaba-inc.com>
AuthorDate: Sun Jun 30 23:24:10 2019 +0800

    [FLINK-12955][hbase] Support LookupableTableSource for HBase
    
    This closes #9045
---
 flink-connectors/flink-hbase/pom.xml               |   7 +
 .../flink/addons/hbase/HBaseLookupFunction.java    | 150 +++++++++++++++
 .../flink/addons/hbase/HBaseRowInputFormat.java    | 163 +----------------
 .../flink/addons/hbase/HBaseTableFactory.java      | 122 +++++++++++++
 .../flink/addons/hbase/HBaseTableSchema.java       | 146 ++++++++++++++-
 .../flink/addons/hbase/HBaseTableSource.java       | 136 ++++++++------
 .../addons/hbase/util/HBaseConfigurationUtil.java  | 100 ++++++++++
 .../flink/addons/hbase/util/HBaseReadHelper.java   | 166 +++++++++++++++++
 .../flink/addons/hbase/util/HBaseTypeUtils.java    | 167 +++++++++++++++++
 .../flink/table/descriptors/HBaseValidator.java    |  51 ++++++
 .../org.apache.flink.table.factories.TableFactory  |  16 ++
 .../addons/hbase/HBaseLookupFunctionITCase.java    | 203 +++++++++++++++++++++
 .../flink/addons/hbase/HBaseTableFactoryTest.java  | 105 +++++++++++
 13 files changed, 1317 insertions(+), 215 deletions(-)

diff --git a/flink-connectors/flink-hbase/pom.xml b/flink-connectors/flink-hbase/pom.xml
index 63580d2..685b4e0 100644
--- a/flink-connectors/flink-hbase/pom.xml
+++ b/flink-connectors/flink-hbase/pom.xml
@@ -288,6 +288,13 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<profiles>
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseLookupFunction.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseLookupFunction.java
new file mode 100644
index 0000000..938d834
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseLookupFunction.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.addons.hbase.util.HBaseReadHelper;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * The HBaseLookupFunction is a standard user-defined table function, it can be used in tableAPI
+ * and also useful for temporal table join plan in SQL.
+ */
+public class HBaseLookupFunction extends TableFunction<Row> {
+	private static final Logger LOG = LoggerFactory.getLogger(HBaseLookupFunction.class);
+	private static final long serialVersionUID = 1L;
+
+	private final String hTableName;
+	private final byte[] serializedConfig;
+	private final HBaseTableSchema hbaseTableSchema;
+
+	private transient HBaseReadHelper readHelper;
+	private transient Connection hConnection;
+	private transient HTable table;
+
+	public HBaseLookupFunction(
+			Configuration configuration,
+			String hTableName,
+			HBaseTableSchema hbaseTableSchema) {
+		this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+		this.hTableName = hTableName;
+		this.hbaseTableSchema = hbaseTableSchema;
+	}
+
+	/**
+	 * The invoke entry point of lookup function.
+	 * @param rowKey the lookup key. Currently only support single rowkey.
+	 */
+	public void eval(Object rowKey) throws IOException {
+		// fetch result
+		Result result = table.get(readHelper.createGet(rowKey));
+		if (!result.isEmpty()) {
+			// parse and collect
+			collect(readHelper.parseToRow(result, rowKey));
+		}
+	}
+
+	@Override
+	public TypeInformation<Row> getResultType() {
+		return hbaseTableSchema.convertsToTableSchema().toRowType();
+	}
+
+	private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() {
+		// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
+		// and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
+		// user params from client-side have the highest priority
+		org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(
+			serializedConfig,
+			HBaseConfiguration.create());
+
+		// do validation: check key option(s) in final runtime configuration
+		if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
+			LOG.error("can not connect to HBase without {} configuration", HConstants.ZOOKEEPER_QUORUM);
+			throw new IllegalArgumentException("check HBase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
+		}
+
+		return runtimeConfig;
+	}
+
+	@Override
+	public void open(FunctionContext context) {
+		LOG.info("start open ...");
+		org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
+		try {
+			hConnection = ConnectionFactory.createConnection(config);
+			table = (HTable) hConnection.getTable(TableName.valueOf(hTableName));
+		} catch (TableNotFoundException tnfe) {
+			LOG.error("Table '{}' not found ", hTableName, tnfe);
+			throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
+		} catch (IOException ioe) {
+			LOG.error("Exception while creating connection to HBase.", ioe);
+			throw new RuntimeException("Cannot create connection to HBase.", ioe);
+		}
+		this.readHelper = new HBaseReadHelper(hbaseTableSchema);
+		LOG.info("end open.");
+	}
+
+	@Override
+	public void close() {
+		LOG.info("start close ...");
+		if (null != table) {
+			try {
+				table.close();
+				table = null;
+			} catch (IOException e) {
+				// ignore exception when close.
+				LOG.warn("exception when close table", e);
+			}
+		}
+		if (null != hConnection) {
+			try {
+				hConnection.close();
+				hConnection = null;
+			} catch (IOException e) {
+				// ignore exception when close.
+				LOG.warn("exception when close connection", e);
+			}
+		}
+		LOG.info("end close.");
+	}
+
+	@VisibleForTesting
+	String getHTableName() {
+		return hTableName;
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
index dde24f0..fd2a116 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.addons.hbase;
 
+import org.apache.flink.addons.hbase.util.HBaseReadHelper;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -33,17 +34,10 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.charset.Charset;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
 
 /**
  * {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row}
@@ -53,78 +47,33 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(HBaseRowInputFormat.class);
-	private String tableName;
-	private transient org.apache.hadoop.conf.Configuration conf;
-	private HBaseTableSchema schema;
-	private transient Charset stringCharset;
 
-	// family keys
-	private byte[][] families;
-	// qualifier keys
-	private byte[][][] qualifiers;
-	// qualifier types
-	private int[][] types;
+	private final String tableName;
+	private final HBaseTableSchema schema;
 
-	// row which is returned
-	private Row resultRow;
-	// nested family rows
-	private Row[] familyRows;
+	private transient org.apache.hadoop.conf.Configuration conf;
+	private transient HBaseReadHelper readHelper;
 
 	public HBaseRowInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) {
 		this.tableName = tableName;
 		this.conf = conf;
 		this.schema = schema;
-
-		// set families, qualifiers, and types
-		String[] familyNames = schema.getFamilyNames();
-		this.families = schema.getFamilyKeys();
-		this.qualifiers = new byte[this.families.length][][];
-		this.types = new int[this.families.length][];
-		for (int f = 0; f < families.length; f++) {
-			this.qualifiers[f] = schema.getQualifierKeys(familyNames[f]);
-			TypeInformation[] typeInfos = schema.getQualifierTypes(familyNames[f]);
-			this.types[f] = new int[typeInfos.length];
-			for (int i = 0; i < typeInfos.length; i++) {
-				int typeIdx = getTypeIndex(typeInfos[i].getTypeClass());
-				if (typeIdx >= 0) {
-					types[f][i] = typeIdx;
-				} else {
-					throw new IllegalArgumentException("Unsupported type: " + typeInfos[i]);
-				}
-			}
-		}
 	}
 
 	@Override
 	public void configure(Configuration parameters) {
 		LOG.info("Initializing HBase configuration.");
+		// prepare hbase read helper
+		this.readHelper = new HBaseReadHelper(schema);
 		connectToTable();
 		if (table != null) {
 			scan = getScanner();
 		}
-
-		// prepare output rows
-		this.resultRow = new Row(families.length);
-		this.familyRows = new Row[families.length];
-		for (int f = 0; f < families.length; f++) {
-			this.familyRows[f] = new Row(qualifiers[f].length);
-			this.resultRow.setField(f, this.familyRows[f]);
-		}
-
-		this.stringCharset = Charset.forName(schema.getStringCharset());
 	}
 
 	@Override
 	protected Scan getScanner() {
-		Scan scan = new Scan();
-		for (int f = 0; f < families.length; f++) {
-			byte[] family = families[f];
-			for (int q = 0; q < qualifiers[f].length; q++) {
-				byte[] quantifier = qualifiers[f][q];
-				scan.addColumn(family, quantifier);
-			}
-		}
-		return scan;
+		return readHelper.createScan();
 	}
 
 	@Override
@@ -134,26 +83,7 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
 
 	@Override
 	protected Row mapResultToOutType(Result res) {
-		for (int f = 0; f < this.families.length; f++) {
-			// get family key
-			byte[] familyKey = families[f];
-			Row familyRow = familyRows[f];
-			for (int q = 0; q < this.qualifiers[f].length; q++) {
-				// get quantifier key
-				byte[] qualifier = qualifiers[f][q];
-				// get quantifier type idx
-				int typeIdx = types[f][q];
-				// read value
-				byte[] value = res.getValue(familyKey, qualifier);
-				if (value != null) {
-					familyRow.setField(q, deserialize(value, typeIdx));
-				} else {
-					familyRow.setField(q, null);
-				}
-			}
-			resultRow.setField(f, familyRow);
-		}
-		return resultRow;
+		return readHelper.parseToRow(res);
 	}
 
 	private void connectToTable() {
@@ -186,79 +116,4 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
 		}
 		return new RowTypeInfo(typeInfos, famNames);
 	}
-
-	private Object deserialize(byte[] value, int typeIdx) {
-		switch (typeIdx) {
-			case 0: // byte[]
-				return value;
-			case 1:
-				return new String(value, stringCharset);
-			case 2: // byte
-				return value[0];
-			case 3:
-				return Bytes.toShort(value);
-			case 4:
-				return Bytes.toInt(value);
-			case 5:
-				return Bytes.toLong(value);
-			case 6:
-				return Bytes.toFloat(value);
-			case 7:
-				return Bytes.toDouble(value);
-			case 8:
-				return Bytes.toBoolean(value);
-			case 9: // sql.Timestamp encoded as long
-				return new Timestamp(Bytes.toLong(value));
-			case 10: // sql.Date encoded as long
-				return new Date(Bytes.toLong(value));
-			case 11: // sql.Time encoded as long
-				return new Time(Bytes.toLong(value));
-			case 12:
-				return Bytes.toBigDecimal(value);
-			case 13:
-				return new BigInteger(value);
-
-			default:
-				throw new IllegalArgumentException("Unknown type index " + typeIdx);
-		}
-	}
-
-	private static int getTypeIndex(Class<?> clazz) {
-		if (byte[].class.equals(clazz)) {
-			return 0;
-		} else if (String.class.equals(clazz)) {
-			return 1;
-		} else if (Byte.class.equals(clazz)) {
-			return 2;
-		} else if (Short.class.equals(clazz)) {
-			return 3;
-		} else if (Integer.class.equals(clazz)) {
-			return 4;
-		} else if (Long.class.equals(clazz)) {
-			return 5;
-		} else if (Float.class.equals(clazz)) {
-			return 6;
-		} else if (Double.class.equals(clazz)) {
-			return 7;
-		} else if (Boolean.class.equals(clazz)) {
-			return 8;
-		} else if (Timestamp.class.equals(clazz)) {
-			return 9;
-		} else if (Date.class.equals(clazz)) {
-			return 10;
-		} else if (Time.class.equals(clazz)) {
-			return 11;
-		} else if (BigDecimal.class.equals(clazz)) {
-			return 12;
-		} else if (BigInteger.class.equals(clazz)) {
-			return 13;
-		} else {
-			return -1;
-		}
-	}
-
-	static boolean isSupportedType(Class<?> clazz) {
-		return getTypeIndex(clazz) != -1;
-	}
-
 }
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableFactory.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableFactory.java
new file mode 100644
index 0000000..922b26f
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableFactory.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.HBaseValidator;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_TABLE_NAME;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_ZK_QUORUM;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link HBaseTableSource} or sink.
+ */
+public class HBaseTableFactory implements StreamTableSourceFactory<Row> {
+
+	@Override
+	public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
+		final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
+		// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
+		Configuration hbaseClientConf = HBaseConfiguration.create();
+		String hbaseZk = properties.get(CONNECTOR_HBASE_ZK_QUORUM);
+		hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseZk);
+		String hTableName = descriptorProperties.getString(CONNECTOR_HBASE_TABLE_NAME);
+		TableSchema tableSchema = descriptorProperties.getTableSchema(SCHEMA);
+		HBaseTableSchema hbaseSchema = validateTableSchema(tableSchema);
+		return new HBaseTableSource(hbaseClientConf, hTableName, hbaseSchema, null);
+	}
+
+	private HBaseTableSchema validateTableSchema(TableSchema schema) {
+		HBaseTableSchema hbaseSchema = new HBaseTableSchema();
+		String[] fieldNames = schema.getFieldNames();
+		TypeInformation[] fieldTypes = schema.getFieldTypes();
+		for (int i = 0; i < fieldNames.length; i++) {
+			String name = fieldNames[i];
+			TypeInformation<?> type = fieldTypes[i];
+			if (type instanceof RowTypeInfo) {
+				RowTypeInfo familyType = (RowTypeInfo) type;
+				String[] qualifierNames = familyType.getFieldNames();
+				TypeInformation[] qualifierTypes = familyType.getFieldTypes();
+				for (int j = 0; j < familyType.getArity(); j++) {
+					hbaseSchema.addColumn(name, qualifierNames[j], qualifierTypes[j].getTypeClass());
+				}
+			} else {
+				hbaseSchema.setRowKey(name, type.getTypeClass());
+			}
+		}
+		return hbaseSchema;
+	}
+
+	private DescriptorProperties getValidatedProperties(Map<String, String> properties) {
+		final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+		descriptorProperties.putProperties(properties);
+		new HBaseValidator().validate(descriptorProperties);
+		return descriptorProperties;
+	}
+
+	@Override
+	public Map<String, String> requiredContext() {
+		Map<String, String> context = new HashMap<>();
+		context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE); // hbase
+		context.put(CONNECTOR_VERSION, hbaseVersion()); // version
+		context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility
+		return context;
+	}
+
+	@Override
+	public List<String> supportedProperties() {
+		List<String> properties = new ArrayList<>();
+
+		properties.add(CONNECTOR_HBASE_TABLE_NAME);
+		properties.add(CONNECTOR_HBASE_ZK_QUORUM);
+
+		// schema
+		properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
+		properties.add(SCHEMA + ".#." + SCHEMA_NAME);
+
+		return properties;
+	}
+
+	private String hbaseVersion() {
+		return CONNECTOR_VERSION_VALUE_143;
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
index fee9fa9..af642d1 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
@@ -18,23 +18,32 @@
 
 package org.apache.flink.addons.hbase;
 
+import org.apache.flink.addons.hbase.util.HBaseTypeUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Helps to specify an HBase Table's schema.
  */
 public class HBaseTableSchema implements Serializable {
 
+	private static final long serialVersionUID = 1L;
+
 	// A Map with key as column family.
 	private final Map<String, Map<String, TypeInformation<?>>> familyMap = new LinkedHashMap<>();
 
+	// information about rowkey
+	private RowKeyInfo rowKeyInfo;
+
 	// charset to parse HBase keys and strings. UTF-8 by default.
 	private String charset = "UTF-8";
 
@@ -51,7 +60,7 @@ public class HBaseTableSchema implements Serializable {
 		Preconditions.checkNotNull(clazz, "class type");
 		Map<String, TypeInformation<?>> qualifierMap = this.familyMap.get(family);
 
-		if (!HBaseRowInputFormat.isSupportedType(clazz)) {
+		if (!HBaseTypeUtils.isSupportedType(clazz)) {
 			// throw exception
 			throw new IllegalArgumentException("Unsupported class type found " + clazz + ". " +
 				"Better to use byte[].class and deserialize using user defined scalar functions");
@@ -65,6 +74,25 @@ public class HBaseTableSchema implements Serializable {
 	}
 
 	/**
+	 * Sets row key information in the table schema.
+	 * @param rowKeyName the row key field name
+	 * @param clazz the data type of the row key
+	 */
+	void setRowKey(String rowKeyName, Class<?> clazz) {
+		Preconditions.checkNotNull(rowKeyName, "row key field name");
+		Preconditions.checkNotNull(clazz, "row key class type");
+		if (!HBaseTypeUtils.isSupportedType(clazz)) {
+			// throw exception
+			throw new IllegalArgumentException("Unsupported class type found " + clazz + ". " +
+				"Better to use byte[].class and deserialize using user defined scalar functions");
+		}
+		if (rowKeyInfo != null) {
+			throw new IllegalArgumentException("Row key can't be set multiple times.");
+		}
+		this.rowKeyInfo = new RowKeyInfo(rowKeyName, TypeExtractor.getForClass(clazz), familyMap.size());
+	}
+
+	/**
 	 * Sets the charset for value strings and HBase identifiers.
 	 *
 	 * @param charset the charset for value strings and HBase identifiers.
@@ -78,8 +106,8 @@ public class HBaseTableSchema implements Serializable {
 	 *
 	 * @return The names of all registered column families.
 	 */
-	String[] getFamilyNames() {
-		return this.familyMap.keySet().toArray(new String[this.familyMap.size()]);
+	public String[] getFamilyNames() {
+		return this.familyMap.keySet().toArray(new String[0]);
 	}
 
 	/**
@@ -87,7 +115,7 @@ public class HBaseTableSchema implements Serializable {
 	 *
 	 * @return The HBase identifiers of all registered column families.
 	 */
-	byte[][] getFamilyKeys() {
+	public byte[][] getFamilyKeys() {
 		Charset c = Charset.forName(charset);
 
 		byte[][] familyKeys = new byte[this.familyMap.size()][];
@@ -126,7 +154,7 @@ public class HBaseTableSchema implements Serializable {
 	 * @param family The name of the column family for which the column qualifier identifiers are returned.
 	 * @return The HBase identifiers of all registered column qualifiers for a specific column family.
 	 */
-	byte[][] getQualifierKeys(String family) {
+	public byte[][] getQualifierKeys(String family) {
 		Map<String, TypeInformation<?>> qualifierMap = familyMap.get(family);
 
 		if (qualifierMap == null) {
@@ -148,7 +176,7 @@ public class HBaseTableSchema implements Serializable {
 	 * @param family The name of the column family for which the column qualifier types are returned.
 	 * @return The types of all registered column qualifiers of a specific column family.
 	 */
-	TypeInformation<?>[] getQualifierTypes(String family) {
+	public TypeInformation<?>[] getQualifierTypes(String family) {
 		Map<String, TypeInformation<?>> qualifierMap = familyMap.get(family);
 
 		if (qualifierMap == null) {
@@ -170,7 +198,7 @@ public class HBaseTableSchema implements Serializable {
 	 * @param family The name of the column family for which the column qualifier names and types are returned.
 	 * @return The names and types of all registered column qualifiers of a specific column family.
 	 */
-	Map<String, TypeInformation<?>> getFamilyInfo(String family) {
+	private Map<String, TypeInformation<?>> getFamilyInfo(String family) {
 		return familyMap.get(family);
 	}
 
@@ -179,8 +207,110 @@ public class HBaseTableSchema implements Serializable {
 	 *
 	 * @return The charset for value strings and HBase identifiers.
 	 */
-	String getStringCharset() {
+	public String getStringCharset() {
 		return this.charset;
 	}
 
+	/**
+	 * Returns field index of row key in the table schema. Returns -1 if row key is not set.
+	 */
+	public int getRowKeyIndex() {
+		return rowKeyInfo == null ? -1 : rowKeyInfo.rowKeyIndex;
+	}
+
+	/**
+	 * Returns the optional type information of row key. Returns null if row key is not set.
+	 */
+	public Optional<TypeInformation<?>> getRowKeyTypeInfo() {
+		return rowKeyInfo == null ? Optional.empty() : Optional.of(rowKeyInfo.rowKeyType);
+	}
+
+	/**
+	 * Returns optional value of row key name.
+	 * The row key name is the field name in hbase schema which can be queried in Flink SQL.
+	 */
+	Optional<String> getRowKeyName() {
+		return rowKeyInfo == null ? Optional.empty() : Optional.of(rowKeyInfo.rowKeyName);
+	}
+
+	/**
+	 * Gets a new hbase schema with the selected fields.
+	 */
+	HBaseTableSchema getProjectedHBaseTableSchema(int[] projectedFields) {
+		if (projectedFields == null) {
+			return this;
+		}
+		HBaseTableSchema newSchema = new HBaseTableSchema();
+		String[] fieldNames = convertsToTableSchema().getFieldNames();
+		for (int projectedField : projectedFields) {
+			String name = fieldNames[projectedField];
+			if (rowKeyInfo != null && name.equals(rowKeyInfo.rowKeyName)) {
+				newSchema.setRowKey(rowKeyInfo.rowKeyName, rowKeyInfo.rowKeyType.getTypeClass());
+			} else {
+				Map<String, TypeInformation<?>> familyInfo = getFamilyInfo(name);
+				for (Map.Entry<String, TypeInformation<?>> entry : familyInfo.entrySet()) {
+					// create the newSchema
+					String qualifier = entry.getKey();
+					newSchema.addColumn(name, qualifier, entry.getValue().getTypeClass());
+				}
+			}
+		}
+		newSchema.setCharset(charset);
+		return newSchema;
+	}
+
+	/**
+	 * Converts this {@link HBaseTableSchema} to {@link TableSchema}, the fields are consisted
+	 * of families and rowkey, the order is in the definition order
+	 * (i.e. calling {@link #addColumn(String, String, Class)} and {@link #setRowKey(String, Class)}).
+	 * The family field is a composite type which is consisted of qualifiers.
+	 *
+	 * @return the {@link TableSchema} derived from the {@link HBaseTableSchema}.
+	 */
+	TableSchema convertsToTableSchema() {
+		String[] familyNames = getFamilyNames();
+		if (rowKeyInfo != null) {
+			String[] fieldNames = new String[familyNames.length + 1];
+			TypeInformation<?>[] fieldTypes = new TypeInformation[familyNames.length + 1];
+			for (int i = 0; i < fieldNames.length; i++) {
+				if (i == rowKeyInfo.rowKeyIndex) {
+					fieldNames[i] = rowKeyInfo.rowKeyName;
+					fieldTypes[i] = rowKeyInfo.rowKeyType;
+				} else {
+					int familyIndex = i < rowKeyInfo.rowKeyIndex ? i : i - 1;
+					String family = familyNames[familyIndex];
+					fieldNames[i] = family;
+					fieldTypes[i] = new RowTypeInfo(getQualifierTypes(family), getQualifierNames(family));
+				}
+			}
+			return new TableSchema(fieldNames, fieldTypes);
+		} else {
+			String[] fieldNames = new String[familyNames.length];
+			TypeInformation<?>[] fieldTypes = new TypeInformation[familyNames.length];
+			for (int i = 0; i < fieldNames.length; i++) {
+				String family = familyNames[i];
+				fieldNames[i] = family;
+				fieldTypes[i] = new RowTypeInfo(getQualifierTypes(family), getQualifierNames(family));
+			}
+			return new TableSchema(fieldNames, fieldTypes);
+		}
+	}
+
+	// ------------------------------------------------------------------------------------
+
+	/**
+	 * An class contains information about rowKey, such as rowKeyName, rowKeyType, rowKeyIndex.
+	 */
+	private static class RowKeyInfo implements Serializable {
+		private static final long serialVersionUID = 1L;
+		final String rowKeyName;
+		final TypeInformation<?> rowKeyType;
+		final int rowKeyIndex;
+
+		RowKeyInfo(String rowKeyName, TypeInformation<?> rowKeyType, int rowKeyIndex) {
+			this.rowKeyName = rowKeyName;
+			this.rowKeyType = rowKeyType;
+			this.rowKeyIndex = rowKeyIndex;
+		}
+	}
 }
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
index c7745b0..3fff5fa 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
@@ -18,21 +18,25 @@
 
 package org.apache.flink.addons.hbase;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.LookupableTableSource;
 import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.utils.TableConnectorUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
 
-import java.util.Map;
-
 /**
  * Creates a TableSource to scan an HBase table.
  *
@@ -46,22 +50,24 @@ import java.util.Map;
  * <pre>
  * {@code
  * HBaseTableSource hSrc = new HBaseTableSource(conf, "hTable");
+ * hSrc.setRowKey("rowkey", String.class);
  * hSrc.addColumn("fam1", "col1", byte[].class);
  * hSrc.addColumn("fam1", "col2", Integer.class);
  * hSrc.addColumn("fam2", "col1", String.class);
  *
  * tableEnv.registerTableSource("hTable", hSrc);
- * Table res = tableEnv.sqlQuery("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t GROUP BY t.fam2.col1");
+ * Table res = tableEnv.sqlQuery(
+ *   "SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t " +
+ *   "WHERE t.rowkey LIKE 'flink%' GROUP BY t.fam2.col1");
  * }
  * </pre>
- *
  */
-public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTableSource<Row> {
+public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTableSource<Row>, StreamTableSource<Row>, LookupableTableSource<Row> {
 
-	private Configuration conf;
-	private String tableName;
-	private HBaseTableSchema hBaseSchema;
-	private TableSchema tableSchema;
+	private final Configuration conf;
+	private final String tableName;
+	private final HBaseTableSchema hbaseSchema;
+	private final int[] projectFields;
 
 	/**
 	 * The HBase configuration and the name of the table to read.
@@ -70,16 +76,14 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
 	 * @param tableName the tableName
 	 */
 	public HBaseTableSource(Configuration conf, String tableName) {
-		this.conf = conf;
-		this.tableName = Preconditions.checkNotNull(tableName, "Table  name");
-		this.hBaseSchema = new HBaseTableSchema();
+		this(conf, tableName, new HBaseTableSchema(), null);
 	}
 
-	private HBaseTableSource(Configuration conf, String tableName, TableSchema tableSchema) {
+	HBaseTableSource(Configuration conf, String tableName, HBaseTableSchema hbaseSchema, int[] projectFields) {
 		this.conf = conf;
 		this.tableName = Preconditions.checkNotNull(tableName, "Table  name");
-		this.hBaseSchema = new HBaseTableSchema();
-		this.tableSchema = tableSchema;
+		this.hbaseSchema = hbaseSchema;
+		this.projectFields = projectFields;
 	}
 
 	/**
@@ -90,7 +94,16 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
 	 * @param clazz     the data type of the qualifier
 	 */
 	public void addColumn(String family, String qualifier, Class<?> clazz) {
-		this.hBaseSchema.addColumn(family, qualifier, clazz);
+		this.hbaseSchema.addColumn(family, qualifier, clazz);
+	}
+
+	/**
+	 * Sets row key information in the table schema.
+	 * @param rowKeyName the row key field name
+	 * @param clazz the data type of the row key
+	 */
+	public void setRowKey(String rowKeyName, Class<?> clazz) {
+		this.hbaseSchema.setRowKey(rowKeyName, clazz);
 	}
 
 	/**
@@ -99,62 +112,79 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
 	 * @param charset Name of the charset to use.
 	 */
 	public void setCharset(String charset) {
-		this.hBaseSchema.setCharset(charset);
+		this.hbaseSchema.setCharset(charset);
 	}
 
 	@Override
 	public TypeInformation<Row> getReturnType() {
-		return new RowTypeInfo(getFieldTypes(), getFieldNames());
+		HBaseTableSchema projectedSchema = hbaseSchema.getProjectedHBaseTableSchema(projectFields);
+		return projectedSchema.convertsToTableSchema().toRowType();
 	}
 
 	@Override
 	public TableSchema getTableSchema() {
-		if (this.tableSchema == null) {
-			return new TableSchema(getFieldNames(), getFieldTypes());
-		} else {
-			return this.tableSchema;
-		}
+		return hbaseSchema.convertsToTableSchema();
 	}
 
-	private String[] getFieldNames() {
-		return hBaseSchema.getFamilyNames();
+	@Override
+	public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
+		HBaseTableSchema projectedSchema = hbaseSchema.getProjectedHBaseTableSchema(projectFields);
+		return execEnv
+			.createInput(new HBaseRowInputFormat(conf, tableName, projectedSchema), getReturnType())
+			.name(explainSource());
 	}
 
-	private TypeInformation[] getFieldTypes() {
-		String[] famNames = hBaseSchema.getFamilyNames();
-		TypeInformation<?>[] fieldTypes = new TypeInformation[hBaseSchema.getFamilyNames().length];
-		int i = 0;
-		for (String family : famNames) {
-			fieldTypes[i] = new RowTypeInfo(hBaseSchema.getQualifierTypes(family), hBaseSchema.getQualifierNames(family));
-			i++;
-		}
-		return fieldTypes;
+	@Override
+	public HBaseTableSource projectFields(int[] fields) {
+		return new HBaseTableSource(this.conf, tableName, hbaseSchema, fields);
 	}
 
 	@Override
-	public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
-		return execEnv.createInput(new HBaseRowInputFormat(conf, tableName, hBaseSchema), getReturnType()).name(explainSource());
+	public String explainSource() {
+		return TableConnectorUtils.generateRuntimeName(this.getClass(), getTableSchema().getFieldNames());
 	}
 
 	@Override
-	public HBaseTableSource projectFields(int[] fields) {
-		String[] famNames = hBaseSchema.getFamilyNames();
-		HBaseTableSource newTableSource = new HBaseTableSource(this.conf, tableName, getTableSchema().copy());
-		// Extract the family from the given fields
-		for (int field : fields) {
-			String family = famNames[field];
-			Map<String, TypeInformation<?>> familyInfo = hBaseSchema.getFamilyInfo(family);
-			for (Map.Entry<String, TypeInformation<?>> entry : familyInfo.entrySet()) {
-				// create the newSchema
-				String qualifier = entry.getKey();
-				newTableSource.addColumn(family, qualifier, entry.getValue().getTypeClass());
-			}
-		}
-		return newTableSource;
+	public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
+		Preconditions.checkArgument(
+			null != lookupKeys && lookupKeys.length == 1,
+			"HBase table can only be retrieved by rowKey for now.");
+		Preconditions.checkState(
+			hbaseSchema.getRowKeyName().isPresent(),
+			"HBase schema must have a row key when used in lookup mode.");
+		Preconditions.checkState(
+			hbaseSchema.getRowKeyName().get().equals(lookupKeys[0]),
+			"The lookup key is not row key of HBase.");
+
+		return new HBaseLookupFunction(
+			this.conf,
+			this.tableName,
+			hbaseSchema.getProjectedHBaseTableSchema(projectFields));
 	}
 
 	@Override
-	public String explainSource() {
-		return TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames());
+	public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys) {
+		throw new UnsupportedOperationException("HBase table doesn't support async lookup currently.");
+	}
+
+	@Override
+	public boolean isAsyncEnabled() {
+		return false;
+	}
+
+	@Override
+	public boolean isBounded() {
+		// HBase source is always bounded.
+		return true;
+	}
+
+	@Override
+	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
+		throw new UnsupportedOperationException("HBase table can not convert to DataStream currently.");
+	}
+
+	@VisibleForTesting
+	HBaseTableSchema getHBaseTableSchema() {
+		return this.hbaseSchema;
 	}
 }
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseConfigurationUtil.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseConfigurationUtil.java
new file mode 100644
index 0000000..ed5d345
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseConfigurationUtil.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * This class helps to do serialization for hadoop Configuration.
+ */
+@Internal
+public class HBaseConfigurationUtil {
+
+	/**
+	 * Serialize a Hadoop {@link Configuration} into byte[].
+	 */
+	public static byte[] serializeConfiguration(Configuration conf) {
+		try {
+			return serializeWritable(conf);
+		} catch (IOException e) {
+			throw new RuntimeException("Encounter an IOException when serialize the Configuration.", e);
+		}
+	}
+
+	/**
+	 * Deserialize a Hadoop {@link Configuration} from byte[].
+	 * Deserialize configs to {@code targetConfig} if it is set.
+	 */
+	public static Configuration deserializeConfiguration(byte[] serializedConfig, Configuration targetConfig) {
+		if (null == targetConfig) {
+			targetConfig = new Configuration();
+		}
+		try {
+			deserializeWritable(targetConfig, serializedConfig);
+		} catch (IOException e) {
+			throw new RuntimeException("Encounter an IOException when deserialize the Configuration.", e);
+		}
+		return targetConfig;
+	}
+
+	/**
+	 * Serialize writable byte[].
+	 *
+	 * @param <T>      the type parameter
+	 * @param writable the writable
+	 * @return the byte [ ]
+	 * @throws IOException the io exception
+	 */
+	private static <T extends Writable> byte[] serializeWritable(T writable) throws IOException {
+		Preconditions.checkArgument(writable != null);
+
+		ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+		DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+		writable.write(outputStream);
+		return byteArrayOutputStream.toByteArray();
+	}
+
+	/**
+	 * Deserialize writable.
+	 *
+	 * @param <T>      the type parameter
+	 * @param writable the writable
+	 * @param bytes    the bytes
+	 * @throws IOException the io exception
+	 */
+	private static <T extends Writable> void deserializeWritable(T writable, byte[] bytes)
+		throws IOException {
+		Preconditions.checkArgument(writable != null);
+		Preconditions.checkArgument(bytes != null);
+
+		ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
+		DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
+		writable.readFields(dataInputStream);
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadHelper.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadHelper.java
new file mode 100644
index 0000000..1f8fac2
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadHelper.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.util;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+
+import java.nio.charset.Charset;
+
+/**
+ * A read helper for HBase. The helper can used to create a {@link Scan} and {@link Get}
+ * for scanning or lookuping a HBase table, and supports converting the HBase {@link Result}
+ * to Flink {@link Row}.
+ */
+public class HBaseReadHelper {
+
+	// family keys
+	private final byte[][] families;
+	// qualifier keys
+	private final byte[][][] qualifiers;
+	// qualifier types
+	private final int[][] qualifierTypes;
+
+	// row key index in output row
+	private final int rowKeyIndex;
+	// type of row key
+	private final int rowKeyType;
+
+	private final int fieldLength;
+	private final Charset charset;
+
+	// row which is returned
+	private Row resultRow;
+	// nested family rows
+	private Row[] familyRows;
+
+	public HBaseReadHelper(HBaseTableSchema hbaseTableSchema) {
+		this.families = hbaseTableSchema.getFamilyKeys();
+		this.qualifiers = new byte[this.families.length][][];
+		this.qualifierTypes = new int[this.families.length][];
+		this.familyRows = new Row[this.families.length];
+		String[] familyNames = hbaseTableSchema.getFamilyNames();
+		for (int f = 0; f < families.length; f++) {
+			this.qualifiers[f] = hbaseTableSchema.getQualifierKeys(familyNames[f]);
+			TypeInformation[] typeInfos = hbaseTableSchema.getQualifierTypes(familyNames[f]);
+			this.qualifierTypes[f] = new int[typeInfos.length];
+			for (int i = 0; i < typeInfos.length; i++) {
+				qualifierTypes[f][i] = HBaseTypeUtils.getTypeIndex(typeInfos[i]);
+			}
+			this.familyRows[f] = new Row(typeInfos.length);
+		}
+		this.charset = Charset.forName(hbaseTableSchema.getStringCharset());
+		// row key
+		this.rowKeyIndex = hbaseTableSchema.getRowKeyIndex();
+		this.rowKeyType = hbaseTableSchema.getRowKeyTypeInfo()
+			.map(HBaseTypeUtils::getTypeIndex)
+			.orElse(-1);
+
+		// field length need take row key into account if it exists.
+		this.fieldLength = rowKeyIndex == -1 ? families.length : families.length + 1;
+
+		// prepare output rows
+		this.resultRow = new Row(fieldLength);
+	}
+
+	/**
+	 * Returns an instance of Get that retrieves the matches records from the HBase table.
+	 *
+	 * @return The appropriate instance of Get for this use case.
+	 */
+	public Get createGet(Object rowKey) {
+		byte[] rowkey = HBaseTypeUtils.serializeFromObject(
+			rowKey,
+			rowKeyType,
+			charset);
+		Get get = new Get(rowkey);
+		for (int f = 0; f < families.length; f++) {
+			byte[] family = families[f];
+			for (byte[] qualifier : qualifiers[f]) {
+				get.addColumn(family, qualifier);
+			}
+		}
+		return get;
+	}
+
+	/**
+	 * Returns an instance of Scan that retrieves the required subset of records from the HBase table.
+	 *
+	 * @return The appropriate instance of Scan for this use case.
+	 */
+	public Scan createScan() {
+		Scan scan = new Scan();
+		for (int f = 0; f < families.length; f++) {
+			byte[] family = families[f];
+			for (int q = 0; q < qualifiers[f].length; q++) {
+				byte[] quantifier = qualifiers[f][q];
+				scan.addColumn(family, quantifier);
+			}
+		}
+		return scan;
+	}
+
+	/**
+	 * Parses HBase {@link Result} into {@link Row}.
+	 */
+	public Row parseToRow(Result result) {
+		if (rowKeyIndex == -1) {
+			return parseToRow(result, null);
+		} else {
+			Object rowkey = HBaseTypeUtils.deserializeToObject(result.getRow(), rowKeyType, charset);
+			return parseToRow(result, rowkey);
+		}
+	}
+
+	/**
+	 * Parses HBase {@link Result} into {@link Row}.
+	 */
+	public Row parseToRow(Result result, Object rowKey) {
+		for (int i = 0; i < fieldLength; i++) {
+			if (rowKeyIndex == i) {
+				resultRow.setField(rowKeyIndex, rowKey);
+			} else {
+				int f = (rowKeyIndex != -1 && i > rowKeyIndex) ? i - 1 : i;
+				// get family key
+				byte[] familyKey = families[f];
+				Row familyRow = familyRows[f];
+				for (int q = 0; q < this.qualifiers[f].length; q++) {
+					// get quantifier key
+					byte[] qualifier = qualifiers[f][q];
+					// get quantifier type idx
+					int typeIdx = qualifierTypes[f][q];
+					// read value
+					byte[] value = result.getValue(familyKey, qualifier);
+					if (value != null) {
+						familyRow.setField(q, HBaseTypeUtils.deserializeToObject(value, typeIdx, charset));
+					} else {
+						familyRow.setField(q, null);
+					}
+				}
+				resultRow.setField(i, familyRow);
+			}
+		}
+		return resultRow;
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java
new file mode 100644
index 0000000..83c9eb2
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.apache.commons.net.ntp.TimeStamp;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * A utility class to process data exchange with HBase type system.
+ */
+@Internal
+public class HBaseTypeUtils {
+
+	private static final byte[] EMPTY_BYTES = new byte[]{};
+
+	/**
+	 * Deserialize byte array to Java Object with the given type.
+	 */
+	public static Object deserializeToObject(byte[] value, int typeIdx, Charset stringCharset) {
+		switch (typeIdx) {
+			case 0: // byte[]
+				return value;
+			case 1: // String
+				return new String(value, stringCharset);
+			case 2: // byte
+				return value[0];
+			case 3:
+				return Bytes.toShort(value);
+			case 4:
+				return Bytes.toInt(value);
+			case 5:
+				return Bytes.toLong(value);
+			case 6:
+				return Bytes.toFloat(value);
+			case 7:
+				return Bytes.toDouble(value);
+			case 8:
+				return Bytes.toBoolean(value);
+			case 9: // sql.Timestamp encoded as long
+				return new Timestamp(Bytes.toLong(value));
+			case 10: // sql.Date encoded as long
+				return new Date(Bytes.toLong(value));
+			case 11: // sql.Time encoded as long
+				return new Time(Bytes.toLong(value));
+			case 12:
+				return Bytes.toBigDecimal(value);
+			case 13:
+				return new BigInteger(value);
+
+			default:
+				throw new IllegalArgumentException("unsupported type index:" + typeIdx);
+		}
+	}
+
+	/**
+	 * Serialize the Java Object to byte array with the given type.
+	 */
+	public static byte[] serializeFromObject(Object value, int typeIdx, Charset stringCharset) {
+		switch (typeIdx) {
+			case 0: // byte[]
+				return (byte[]) value;
+			case 1: // external String
+				return value == null ? EMPTY_BYTES : ((String) value).getBytes(stringCharset);
+			case 2: // byte
+				return value == null ? EMPTY_BYTES : new byte[]{(byte) value};
+			case 3:
+				return Bytes.toBytes((short) value);
+			case 4:
+				return Bytes.toBytes((int) value);
+			case 5:
+				return Bytes.toBytes((long) value);
+			case 6:
+				return Bytes.toBytes((float) value);
+			case 7:
+				return Bytes.toBytes((double) value);
+			case 8:
+				return Bytes.toBytes((boolean) value);
+			case 9: // sql.Timestamp encoded to Long
+				return Bytes.toBytes(((TimeStamp) value).getTime());
+			case 10: // sql.Date encoded as long
+				return Bytes.toBytes(((Date) value).getTime());
+			case 11: // sql.Time encoded as long
+				return Bytes.toBytes(((Time) value).getTime());
+			case 12:
+				return Bytes.toBytes((BigDecimal) value);
+			case 13:
+				return ((BigInteger) value).toByteArray();
+
+			default:
+				throw new IllegalArgumentException("unsupported type index:" + typeIdx);
+		}
+	}
+
+	/**
+	 * Gets the type index (type representation in HBase connector) from the {@link TypeInformation}.
+	 */
+	public static int getTypeIndex(TypeInformation typeInfo) {
+		return getTypeIndex(typeInfo.getTypeClass());
+	}
+
+	/**
+	 * Checks whether the given Class is a supported type in HBase connector.
+	 */
+	public static boolean isSupportedType(Class<?> clazz) {
+		return getTypeIndex(clazz) != -1;
+	}
+
+	private static int getTypeIndex(Class<?> clazz) {
+		if (byte[].class.equals(clazz)) {
+			return 0;
+		} else if (String.class.equals(clazz)) {
+			return 1;
+		} else if (Byte.class.equals(clazz)) {
+			return 2;
+		} else if (Short.class.equals(clazz)) {
+			return 3;
+		} else if (Integer.class.equals(clazz)) {
+			return 4;
+		} else if (Long.class.equals(clazz)) {
+			return 5;
+		} else if (Float.class.equals(clazz)) {
+			return 6;
+		} else if (Double.class.equals(clazz)) {
+			return 7;
+		} else if (Boolean.class.equals(clazz)) {
+			return 8;
+		} else if (Timestamp.class.equals(clazz)) {
+			return 9;
+		} else if (Date.class.equals(clazz)) {
+			return 10;
+		} else if (Time.class.equals(clazz)) {
+			return 11;
+		} else if (BigDecimal.class.equals(clazz)) {
+			return 12;
+		} else if (BigInteger.class.equals(clazz)) {
+			return 13;
+		} else {
+			return -1;
+		}
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/table/descriptors/HBaseValidator.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/table/descriptors/HBaseValidator.java
new file mode 100644
index 0000000..fd844e9
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/table/descriptors/HBaseValidator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The validator for HBase.
+ * More features to be supported, e.g., batch read/write, async api(support from hbase version 2.0.0), Caching for LookupFunction.
+ */
+@Internal
+public class HBaseValidator extends ConnectorDescriptorValidator {
+
+	public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
+	public static final String CONNECTOR_VERSION_VALUE_143 = "1.4.3";
+	public static final String CONNECTOR_HBASE_TABLE_NAME = "connector.table-name";
+	public static final String CONNECTOR_HBASE_ZK_QUORUM = "connector.zookeeper.quorum";
+
+	@Override
+	public void validate(DescriptorProperties properties) {
+		super.validate(properties);
+		properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE, false);
+		properties.validateString(CONNECTOR_HBASE_TABLE_NAME, false, 1);
+		properties.validateString(CONNECTOR_HBASE_ZK_QUORUM, false, 1);
+		validateVersion(properties);
+	}
+
+	private void validateVersion(DescriptorProperties properties) {
+		final List<String> versions = Arrays.asList(CONNECTOR_VERSION_VALUE_143);
+		properties.validateEnumValues(CONNECTOR_VERSION, false, versions);
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-hbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000..15cffd6
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.addons.hbase.HBaseTableFactory
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
new file mode 100644
index 0000000..a0822a3
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_TABLE_NAME;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_ZK_QUORUM;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/**
+ * IT case Test HBaseLookupFunction.
+ */
+public class HBaseLookupFunctionITCase extends HBaseTestingClusterAutostarter {
+	private static final String ROWKEY = "rk";
+	private static final String FAMILY1 = "family1";
+	private static final String F1COL1 = "col1";
+
+	private static final String FAMILY2 = "family2";
+	private static final String F2COL1 = "col1";
+	private static final String F2COL2 = "col2";
+
+	private static final String FAMILY3 = "family3";
+	private static final String F3COL1 = "col1";
+	private static final String F3COL2 = "col2";
+	private static final String F3COL3 = "col3";
+
+	private static final String HTABLE_NAME = "testSrcHBaseTable1";
+
+	// prepare a source collection.
+	private static final List<Row> testData1 = new ArrayList<>();
+
+	static {
+		testData1.add(Row.of(1, 1L, "Hi"));
+		testData1.add(Row.of(2, 2L, "Hello"));
+		testData1.add(Row.of(3, 2L, "Hello world"));
+		testData1.add(Row.of(3, 3L, "Hello world!"));
+	}
+
+	private static final TypeInformation<?>[] testTypes1 = {BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};
+	private static final String[] testColumns1 = {"a", "b", "c"};
+	private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(testTypes1, testColumns1);
+
+	@BeforeClass
+	public static void activateHBaseCluster() throws IOException {
+		registerHBaseMiniClusterInClasspath();
+		prepareHBaseTableWithData();
+	}
+
+	private static void prepareHBaseTableWithData() throws IOException {
+		// create a table
+		TableName tableName = TableName.valueOf(HTABLE_NAME);
+		// column families
+		byte[][] families = new byte[][]{Bytes.toBytes(FAMILY1), Bytes.toBytes(FAMILY2), Bytes.toBytes(FAMILY3)};
+		// split keys
+		byte[][] splitKeys = new byte[][]{Bytes.toBytes(4)};
+		createTable(tableName, families, splitKeys);
+
+		// get the HTable instance
+		HTable table = openTable(tableName);
+		List<Put> puts = new ArrayList<>();
+		// add some data
+		puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
+		puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
+		puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
+		puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
+		puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
+		puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
+		puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
+		puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
+
+		// append rows to table
+		table.put(puts);
+		table.close();
+	}
+
+	private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) {
+		Put put = new Put(Bytes.toBytes(rowKey));
+		// family 1
+		put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), Bytes.toBytes(f1c1));
+		// family 2
+		if (f2c1 != null) {
+			put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1));
+		}
+		put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), Bytes.toBytes(f2c2));
+		// family 3
+		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), Bytes.toBytes(f3c1));
+		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), Bytes.toBytes(f3c2));
+		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), Bytes.toBytes(f3c3));
+
+		return put;
+	}
+
+	private static Map<String, String> hbaseTableProperties() {
+		Map<String, String> properties = new HashMap<>();
+		properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE);
+		properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
+		properties.put(CONNECTOR_PROPERTY_VERSION, "1");
+		properties.put(CONNECTOR_HBASE_TABLE_NAME, HTABLE_NAME);
+		// get zk quorum from "hbase-site.xml" in classpath
+		String hbaseZk = HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM);
+		properties.put(CONNECTOR_HBASE_ZK_QUORUM, hbaseZk);
+		// schema
+		String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
+		TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT);
+		TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, F2COL2}, Types.STRING, Types.LONG);
+		TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING);
+		TypeInformation[] columnTypes = new TypeInformation[]{f1, Types.INT, f2, f3};
+
+		DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+		TableSchema tableSchema = new TableSchema(columnNames, columnTypes);
+		descriptorProperties.putTableSchema(SCHEMA, tableSchema);
+		descriptorProperties.putProperties(properties);
+		return descriptorProperties.asMap();
+	}
+
+	@Test
+	public void testHBaseLookupFunction() throws Exception {
+		StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		streamEnv.setParallelism(4);
+		StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv);
+		StreamITCase.clear();
+
+		// prepare a source table
+		String srcTableName = "testStreamSrcTable1";
+		DataStream<Row> ds = streamEnv.fromCollection(testData1).returns(testTypeInfo1);
+		Table in = streamTableEnv.fromDataStream(ds, String.join(",", testColumns1));
+		streamTableEnv.registerTable(srcTableName, in);
+
+		Map<String, String> tableProperties = hbaseTableProperties();
+		TableSource source = TableFactoryService
+			.find(HBaseTableFactory.class, tableProperties)
+			.createTableSource(tableProperties);
+
+		streamTableEnv.registerFunction("hbaseLookup", ((HBaseTableSource) source).getLookupFunction(new String[]{ROWKEY}));
+
+		// perform a temporal table join query
+		String sqlQuery = "SELECT a,family1.col1, family3.col3 FROM testStreamSrcTable1, LATERAL TABLE(hbaseLookup(a))";
+		Table result = streamTableEnv.sqlQuery(sqlQuery);
+
+		DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink<>());
+
+		streamEnv.execute();
+
+		List<String> expected = new ArrayList<>();
+		expected.add("1,10,Welt-1");
+		expected.add("2,20,Welt-2");
+		expected.add("3,30,Welt-3");
+		expected.add("3,30,Welt-3");
+
+		StreamITCase.compareWithList(expected);
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTableFactoryTest.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTableFactoryTest.java
new file mode 100644
index 0000000..4061c0e
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTableFactoryTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_TABLE_NAME;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_ZK_QUORUM;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/**
+ * UT for HBaseTableFactory.
+ */
+public class HBaseTableFactoryTest {
+	private static final String FAMILY1 = "f1";
+	private static final String FAMILY2 = "f2";
+	private static final String FAMILY3 = "f3";
+	private static final String COL1 = "c1";
+	private static final String COL2 = "c2";
+	private static final String COL3 = "c3";
+	private static final String ROWKEY = "rowkey";
+
+	private DescriptorProperties createDescriptor(String[] columnNames, TypeInformation[] columnTypes) {
+		TableSchema tableSchema = new TableSchema(columnNames, columnTypes);
+
+		Map<String, String> tableServiceLookupConf = new HashMap<>();
+		tableServiceLookupConf.put(CONNECTOR_TYPE, "hbase");
+		tableServiceLookupConf.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
+		tableServiceLookupConf.put(CONNECTOR_PROPERTY_VERSION, "1");
+		tableServiceLookupConf.put(CONNECTOR_HBASE_TABLE_NAME, "testHBastTable");
+		tableServiceLookupConf.put(CONNECTOR_HBASE_ZK_QUORUM, "localhost:2181");
+
+		DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+		descriptorProperties.putTableSchema(SCHEMA, tableSchema);
+		descriptorProperties.putProperties(tableServiceLookupConf);
+		return descriptorProperties;
+	}
+
+	@Test
+	public void testConstructorForNestedSchema() {
+		String[] columnNames = {FAMILY1, FAMILY2, ROWKEY, FAMILY3};
+
+		TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{COL1}, Types.INT);
+		TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{COL1, COL2}, Types.INT, Types.LONG);
+		TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{COL1, COL2, COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING);
+		TypeInformation[] columnTypes = new TypeInformation[]{f1, f2, Types.LONG, f3};
+
+		DescriptorProperties descriptorProperties = createDescriptor(columnNames, columnTypes);
+		TableSource source = TableFactoryService.find(HBaseTableFactory.class,
+			descriptorProperties.asMap()).createTableSource(descriptorProperties.asMap());
+		Assert.assertTrue(source instanceof HBaseTableSource);
+		TableFunction<Row> tableFunction = ((HBaseTableSource) source).getLookupFunction(new String[]{ROWKEY});
+		Assert.assertTrue(tableFunction instanceof HBaseLookupFunction);
+		Assert.assertEquals("testHBastTable", ((HBaseLookupFunction) tableFunction).getHTableName());
+
+		HBaseTableSchema hbaseSchema = ((HBaseTableSource) source).getHBaseTableSchema();
+		Assert.assertEquals(2, hbaseSchema.getRowKeyIndex());
+		Assert.assertEquals(Optional.of(Types.LONG), hbaseSchema.getRowKeyTypeInfo());
+
+		Assert.assertArrayEquals(new String[]{"f1", "f2", "f3"}, hbaseSchema.getFamilyNames());
+		Assert.assertArrayEquals(new String[]{"c1"}, hbaseSchema.getQualifierNames("f1"));
+		Assert.assertArrayEquals(new String[]{"c1", "c2"}, hbaseSchema.getQualifierNames("f2"));
+		Assert.assertArrayEquals(new String[]{"c1", "c2", "c3"}, hbaseSchema.getQualifierNames("f3"));
+
+		Assert.assertArrayEquals(new TypeInformation[]{Types.INT}, hbaseSchema.getQualifierTypes("f1"));
+		Assert.assertArrayEquals(new TypeInformation[]{Types.INT, Types.LONG}, hbaseSchema.getQualifierTypes("f2"));
+		Assert.assertArrayEquals(new TypeInformation[]{Types.DOUBLE, Types.BOOLEAN, Types.STRING}, hbaseSchema.getQualifierTypes("f3"));
+	}
+}