You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/10 04:33:40 UTC
[flink] branch master updated: [FLINK-12955][hbase] Support
LookupableTableSource for HBase
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a1636b1 [FLINK-12955][hbase] Support LookupableTableSource for HBase
a1636b1 is described below
commit a1636b1e3b05fb366a9e25a23b555648860d9223
Author: qiangsi.lq <qi...@alibaba-inc.com>
AuthorDate: Sun Jun 30 23:24:10 2019 +0800
[FLINK-12955][hbase] Support LookupableTableSource for HBase
This closes #9045
---
flink-connectors/flink-hbase/pom.xml | 7 +
.../flink/addons/hbase/HBaseLookupFunction.java | 150 +++++++++++++++
.../flink/addons/hbase/HBaseRowInputFormat.java | 163 +----------------
.../flink/addons/hbase/HBaseTableFactory.java | 122 +++++++++++++
.../flink/addons/hbase/HBaseTableSchema.java | 146 ++++++++++++++-
.../flink/addons/hbase/HBaseTableSource.java | 136 ++++++++------
.../addons/hbase/util/HBaseConfigurationUtil.java | 100 ++++++++++
.../flink/addons/hbase/util/HBaseReadHelper.java | 166 +++++++++++++++++
.../flink/addons/hbase/util/HBaseTypeUtils.java | 167 +++++++++++++++++
.../flink/table/descriptors/HBaseValidator.java | 51 ++++++
.../org.apache.flink.table.factories.TableFactory | 16 ++
.../addons/hbase/HBaseLookupFunctionITCase.java | 203 +++++++++++++++++++++
.../flink/addons/hbase/HBaseTableFactoryTest.java | 105 +++++++++++
13 files changed, 1317 insertions(+), 215 deletions(-)
diff --git a/flink-connectors/flink-hbase/pom.xml b/flink-connectors/flink-hbase/pom.xml
index 63580d2..685b4e0 100644
--- a/flink-connectors/flink-hbase/pom.xml
+++ b/flink-connectors/flink-hbase/pom.xml
@@ -288,6 +288,13 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseLookupFunction.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseLookupFunction.java
new file mode 100644
index 0000000..938d834
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseLookupFunction.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.addons.hbase.util.HBaseReadHelper;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * The HBaseLookupFunction is a standard user-defined table function, it can be used in tableAPI
+ * and also useful for temporal table join plan in SQL.
+ */
+public class HBaseLookupFunction extends TableFunction<Row> {
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseLookupFunction.class);
+ private static final long serialVersionUID = 1L;
+
+ private final String hTableName;
+ private final byte[] serializedConfig;
+ private final HBaseTableSchema hbaseTableSchema;
+
+ private transient HBaseReadHelper readHelper;
+ private transient Connection hConnection;
+ private transient HTable table;
+
+ public HBaseLookupFunction(
+ Configuration configuration,
+ String hTableName,
+ HBaseTableSchema hbaseTableSchema) {
+ this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
+ this.hTableName = hTableName;
+ this.hbaseTableSchema = hbaseTableSchema;
+ }
+
+ /**
+ * The invoke entry point of lookup function.
+ * @param rowKey the lookup key. Currently only support single rowkey.
+ */
+ public void eval(Object rowKey) throws IOException {
+ // fetch result
+ Result result = table.get(readHelper.createGet(rowKey));
+ if (!result.isEmpty()) {
+ // parse and collect
+ collect(readHelper.parseToRow(result, rowKey));
+ }
+ }
+
+ @Override
+ public TypeInformation<Row> getResultType() {
+ return hbaseTableSchema.convertsToTableSchema().toRowType();
+ }
+
+ private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() {
+ // create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
+ // and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
+ // user params from client-side have the highest priority
+ org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(
+ serializedConfig,
+ HBaseConfiguration.create());
+
+ // do validation: check key option(s) in final runtime configuration
+ if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
+ LOG.error("can not connect to HBase without {} configuration", HConstants.ZOOKEEPER_QUORUM);
+ throw new IllegalArgumentException("check HBase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
+ }
+
+ return runtimeConfig;
+ }
+
+ @Override
+ public void open(FunctionContext context) {
+ LOG.info("start open ...");
+ org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
+ try {
+ hConnection = ConnectionFactory.createConnection(config);
+ table = (HTable) hConnection.getTable(TableName.valueOf(hTableName));
+ } catch (TableNotFoundException tnfe) {
+ LOG.error("Table '{}' not found ", hTableName, tnfe);
+ throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
+ } catch (IOException ioe) {
+ LOG.error("Exception while creating connection to HBase.", ioe);
+ throw new RuntimeException("Cannot create connection to HBase.", ioe);
+ }
+ this.readHelper = new HBaseReadHelper(hbaseTableSchema);
+ LOG.info("end open.");
+ }
+
+ @Override
+ public void close() {
+ LOG.info("start close ...");
+ if (null != table) {
+ try {
+ table.close();
+ table = null;
+ } catch (IOException e) {
+ // ignore exception when close.
+ LOG.warn("exception when close table", e);
+ }
+ }
+ if (null != hConnection) {
+ try {
+ hConnection.close();
+ hConnection = null;
+ } catch (IOException e) {
+ // ignore exception when close.
+ LOG.warn("exception when close connection", e);
+ }
+ }
+ LOG.info("end close.");
+ }
+
+ @VisibleForTesting
+ String getHTableName() {
+ return hTableName;
+ }
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
index dde24f0..fd2a116 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.flink.addons.hbase;
+import org.apache.flink.addons.hbase.util.HBaseReadHelper;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -33,17 +34,10 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.charset.Charset;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
/**
* {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row}
@@ -53,78 +47,33 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(HBaseRowInputFormat.class);
- private String tableName;
- private transient org.apache.hadoop.conf.Configuration conf;
- private HBaseTableSchema schema;
- private transient Charset stringCharset;
- // family keys
- private byte[][] families;
- // qualifier keys
- private byte[][][] qualifiers;
- // qualifier types
- private int[][] types;
+ private final String tableName;
+ private final HBaseTableSchema schema;
- // row which is returned
- private Row resultRow;
- // nested family rows
- private Row[] familyRows;
+ private transient org.apache.hadoop.conf.Configuration conf;
+ private transient HBaseReadHelper readHelper;
public HBaseRowInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) {
this.tableName = tableName;
this.conf = conf;
this.schema = schema;
-
- // set families, qualifiers, and types
- String[] familyNames = schema.getFamilyNames();
- this.families = schema.getFamilyKeys();
- this.qualifiers = new byte[this.families.length][][];
- this.types = new int[this.families.length][];
- for (int f = 0; f < families.length; f++) {
- this.qualifiers[f] = schema.getQualifierKeys(familyNames[f]);
- TypeInformation[] typeInfos = schema.getQualifierTypes(familyNames[f]);
- this.types[f] = new int[typeInfos.length];
- for (int i = 0; i < typeInfos.length; i++) {
- int typeIdx = getTypeIndex(typeInfos[i].getTypeClass());
- if (typeIdx >= 0) {
- types[f][i] = typeIdx;
- } else {
- throw new IllegalArgumentException("Unsupported type: " + typeInfos[i]);
- }
- }
- }
}
@Override
public void configure(Configuration parameters) {
LOG.info("Initializing HBase configuration.");
+ // prepare hbase read helper
+ this.readHelper = new HBaseReadHelper(schema);
connectToTable();
if (table != null) {
scan = getScanner();
}
-
- // prepare output rows
- this.resultRow = new Row(families.length);
- this.familyRows = new Row[families.length];
- for (int f = 0; f < families.length; f++) {
- this.familyRows[f] = new Row(qualifiers[f].length);
- this.resultRow.setField(f, this.familyRows[f]);
- }
-
- this.stringCharset = Charset.forName(schema.getStringCharset());
}
@Override
protected Scan getScanner() {
- Scan scan = new Scan();
- for (int f = 0; f < families.length; f++) {
- byte[] family = families[f];
- for (int q = 0; q < qualifiers[f].length; q++) {
- byte[] quantifier = qualifiers[f][q];
- scan.addColumn(family, quantifier);
- }
- }
- return scan;
+ return readHelper.createScan();
}
@Override
@@ -134,26 +83,7 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
@Override
protected Row mapResultToOutType(Result res) {
- for (int f = 0; f < this.families.length; f++) {
- // get family key
- byte[] familyKey = families[f];
- Row familyRow = familyRows[f];
- for (int q = 0; q < this.qualifiers[f].length; q++) {
- // get quantifier key
- byte[] qualifier = qualifiers[f][q];
- // get quantifier type idx
- int typeIdx = types[f][q];
- // read value
- byte[] value = res.getValue(familyKey, qualifier);
- if (value != null) {
- familyRow.setField(q, deserialize(value, typeIdx));
- } else {
- familyRow.setField(q, null);
- }
- }
- resultRow.setField(f, familyRow);
- }
- return resultRow;
+ return readHelper.parseToRow(res);
}
private void connectToTable() {
@@ -186,79 +116,4 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
}
return new RowTypeInfo(typeInfos, famNames);
}
-
- private Object deserialize(byte[] value, int typeIdx) {
- switch (typeIdx) {
- case 0: // byte[]
- return value;
- case 1:
- return new String(value, stringCharset);
- case 2: // byte
- return value[0];
- case 3:
- return Bytes.toShort(value);
- case 4:
- return Bytes.toInt(value);
- case 5:
- return Bytes.toLong(value);
- case 6:
- return Bytes.toFloat(value);
- case 7:
- return Bytes.toDouble(value);
- case 8:
- return Bytes.toBoolean(value);
- case 9: // sql.Timestamp encoded as long
- return new Timestamp(Bytes.toLong(value));
- case 10: // sql.Date encoded as long
- return new Date(Bytes.toLong(value));
- case 11: // sql.Time encoded as long
- return new Time(Bytes.toLong(value));
- case 12:
- return Bytes.toBigDecimal(value);
- case 13:
- return new BigInteger(value);
-
- default:
- throw new IllegalArgumentException("Unknown type index " + typeIdx);
- }
- }
-
- private static int getTypeIndex(Class<?> clazz) {
- if (byte[].class.equals(clazz)) {
- return 0;
- } else if (String.class.equals(clazz)) {
- return 1;
- } else if (Byte.class.equals(clazz)) {
- return 2;
- } else if (Short.class.equals(clazz)) {
- return 3;
- } else if (Integer.class.equals(clazz)) {
- return 4;
- } else if (Long.class.equals(clazz)) {
- return 5;
- } else if (Float.class.equals(clazz)) {
- return 6;
- } else if (Double.class.equals(clazz)) {
- return 7;
- } else if (Boolean.class.equals(clazz)) {
- return 8;
- } else if (Timestamp.class.equals(clazz)) {
- return 9;
- } else if (Date.class.equals(clazz)) {
- return 10;
- } else if (Time.class.equals(clazz)) {
- return 11;
- } else if (BigDecimal.class.equals(clazz)) {
- return 12;
- } else if (BigInteger.class.equals(clazz)) {
- return 13;
- } else {
- return -1;
- }
- }
-
- static boolean isSupportedType(Class<?> clazz) {
- return getTypeIndex(clazz) != -1;
- }
-
}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableFactory.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableFactory.java
new file mode 100644
index 0000000..922b26f
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableFactory.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.HBaseValidator;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_TABLE_NAME;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_ZK_QUORUM;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link HBaseTableSource} or sink.
+ */
+public class HBaseTableFactory implements StreamTableSourceFactory<Row> {
+
+ @Override
+ public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
+ // create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
+ Configuration hbaseClientConf = HBaseConfiguration.create();
+ String hbaseZk = properties.get(CONNECTOR_HBASE_ZK_QUORUM);
+ hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseZk);
+ String hTableName = descriptorProperties.getString(CONNECTOR_HBASE_TABLE_NAME);
+ TableSchema tableSchema = descriptorProperties.getTableSchema(SCHEMA);
+ HBaseTableSchema hbaseSchema = validateTableSchema(tableSchema);
+ return new HBaseTableSource(hbaseClientConf, hTableName, hbaseSchema, null);
+ }
+
+ private HBaseTableSchema validateTableSchema(TableSchema schema) {
+ HBaseTableSchema hbaseSchema = new HBaseTableSchema();
+ String[] fieldNames = schema.getFieldNames();
+ TypeInformation[] fieldTypes = schema.getFieldTypes();
+ for (int i = 0; i < fieldNames.length; i++) {
+ String name = fieldNames[i];
+ TypeInformation<?> type = fieldTypes[i];
+ if (type instanceof RowTypeInfo) {
+ RowTypeInfo familyType = (RowTypeInfo) type;
+ String[] qualifierNames = familyType.getFieldNames();
+ TypeInformation[] qualifierTypes = familyType.getFieldTypes();
+ for (int j = 0; j < familyType.getArity(); j++) {
+ hbaseSchema.addColumn(name, qualifierNames[j], qualifierTypes[j].getTypeClass());
+ }
+ } else {
+ hbaseSchema.setRowKey(name, type.getTypeClass());
+ }
+ }
+ return hbaseSchema;
+ }
+
+ private DescriptorProperties getValidatedProperties(Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+ descriptorProperties.putProperties(properties);
+ new HBaseValidator().validate(descriptorProperties);
+ return descriptorProperties;
+ }
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> context = new HashMap<>();
+ context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE); // hbase
+ context.put(CONNECTOR_VERSION, hbaseVersion()); // version
+ context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility
+ return context;
+ }
+
+ @Override
+ public List<String> supportedProperties() {
+ List<String> properties = new ArrayList<>();
+
+ properties.add(CONNECTOR_HBASE_TABLE_NAME);
+ properties.add(CONNECTOR_HBASE_ZK_QUORUM);
+
+ // schema
+ properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
+ properties.add(SCHEMA + ".#." + SCHEMA_NAME);
+
+ return properties;
+ }
+
+ private String hbaseVersion() {
+ return CONNECTOR_VERSION_VALUE_143;
+ }
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
index fee9fa9..af642d1 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
@@ -18,23 +18,32 @@
package org.apache.flink.addons.hbase;
+import org.apache.flink.addons.hbase.util.HBaseTypeUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Optional;
/**
* Helps to specify an HBase Table's schema.
*/
public class HBaseTableSchema implements Serializable {
+ private static final long serialVersionUID = 1L;
+
// A Map with key as column family.
private final Map<String, Map<String, TypeInformation<?>>> familyMap = new LinkedHashMap<>();
+ // information about rowkey
+ private RowKeyInfo rowKeyInfo;
+
// charset to parse HBase keys and strings. UTF-8 by default.
private String charset = "UTF-8";
@@ -51,7 +60,7 @@ public class HBaseTableSchema implements Serializable {
Preconditions.checkNotNull(clazz, "class type");
Map<String, TypeInformation<?>> qualifierMap = this.familyMap.get(family);
- if (!HBaseRowInputFormat.isSupportedType(clazz)) {
+ if (!HBaseTypeUtils.isSupportedType(clazz)) {
// throw exception
throw new IllegalArgumentException("Unsupported class type found " + clazz + ". " +
"Better to use byte[].class and deserialize using user defined scalar functions");
@@ -65,6 +74,25 @@ public class HBaseTableSchema implements Serializable {
}
/**
+ * Sets row key information in the table schema.
+ * @param rowKeyName the row key field name
+ * @param clazz the data type of the row key
+ */
+ void setRowKey(String rowKeyName, Class<?> clazz) {
+ Preconditions.checkNotNull(rowKeyName, "row key field name");
+ Preconditions.checkNotNull(clazz, "row key class type");
+ if (!HBaseTypeUtils.isSupportedType(clazz)) {
+ // throw exception
+ throw new IllegalArgumentException("Unsupported class type found " + clazz + ". " +
+ "Better to use byte[].class and deserialize using user defined scalar functions");
+ }
+ if (rowKeyInfo != null) {
+ throw new IllegalArgumentException("Row key can't be set multiple times.");
+ }
+ this.rowKeyInfo = new RowKeyInfo(rowKeyName, TypeExtractor.getForClass(clazz), familyMap.size());
+ }
+
+ /**
* Sets the charset for value strings and HBase identifiers.
*
* @param charset the charset for value strings and HBase identifiers.
@@ -78,8 +106,8 @@ public class HBaseTableSchema implements Serializable {
*
* @return The names of all registered column families.
*/
- String[] getFamilyNames() {
- return this.familyMap.keySet().toArray(new String[this.familyMap.size()]);
+ public String[] getFamilyNames() {
+ return this.familyMap.keySet().toArray(new String[0]);
}
/**
@@ -87,7 +115,7 @@ public class HBaseTableSchema implements Serializable {
*
* @return The HBase identifiers of all registered column families.
*/
- byte[][] getFamilyKeys() {
+ public byte[][] getFamilyKeys() {
Charset c = Charset.forName(charset);
byte[][] familyKeys = new byte[this.familyMap.size()][];
@@ -126,7 +154,7 @@ public class HBaseTableSchema implements Serializable {
* @param family The name of the column family for which the column qualifier identifiers are returned.
* @return The HBase identifiers of all registered column qualifiers for a specific column family.
*/
- byte[][] getQualifierKeys(String family) {
+ public byte[][] getQualifierKeys(String family) {
Map<String, TypeInformation<?>> qualifierMap = familyMap.get(family);
if (qualifierMap == null) {
@@ -148,7 +176,7 @@ public class HBaseTableSchema implements Serializable {
* @param family The name of the column family for which the column qualifier types are returned.
* @return The types of all registered column qualifiers of a specific column family.
*/
- TypeInformation<?>[] getQualifierTypes(String family) {
+ public TypeInformation<?>[] getQualifierTypes(String family) {
Map<String, TypeInformation<?>> qualifierMap = familyMap.get(family);
if (qualifierMap == null) {
@@ -170,7 +198,7 @@ public class HBaseTableSchema implements Serializable {
* @param family The name of the column family for which the column qualifier names and types are returned.
* @return The names and types of all registered column qualifiers of a specific column family.
*/
- Map<String, TypeInformation<?>> getFamilyInfo(String family) {
+ private Map<String, TypeInformation<?>> getFamilyInfo(String family) {
return familyMap.get(family);
}
@@ -179,8 +207,110 @@ public class HBaseTableSchema implements Serializable {
*
* @return The charset for value strings and HBase identifiers.
*/
- String getStringCharset() {
+ public String getStringCharset() {
return this.charset;
}
+ /**
+ * Returns field index of row key in the table schema. Returns -1 if row key is not set.
+ */
+ public int getRowKeyIndex() {
+ return rowKeyInfo == null ? -1 : rowKeyInfo.rowKeyIndex;
+ }
+
+ /**
+ * Returns the optional type information of row key. Returns null if row key is not set.
+ */
+ public Optional<TypeInformation<?>> getRowKeyTypeInfo() {
+ return rowKeyInfo == null ? Optional.empty() : Optional.of(rowKeyInfo.rowKeyType);
+ }
+
+ /**
+ * Returns optional value of row key name.
+ * The row key name is the field name in hbase schema which can be queried in Flink SQL.
+ */
+ Optional<String> getRowKeyName() {
+ return rowKeyInfo == null ? Optional.empty() : Optional.of(rowKeyInfo.rowKeyName);
+ }
+
+ /**
+ * Gets a new hbase schema with the selected fields.
+ */
+ HBaseTableSchema getProjectedHBaseTableSchema(int[] projectedFields) {
+ if (projectedFields == null) {
+ return this;
+ }
+ HBaseTableSchema newSchema = new HBaseTableSchema();
+ String[] fieldNames = convertsToTableSchema().getFieldNames();
+ for (int projectedField : projectedFields) {
+ String name = fieldNames[projectedField];
+ if (rowKeyInfo != null && name.equals(rowKeyInfo.rowKeyName)) {
+ newSchema.setRowKey(rowKeyInfo.rowKeyName, rowKeyInfo.rowKeyType.getTypeClass());
+ } else {
+ Map<String, TypeInformation<?>> familyInfo = getFamilyInfo(name);
+ for (Map.Entry<String, TypeInformation<?>> entry : familyInfo.entrySet()) {
+ // create the newSchema
+ String qualifier = entry.getKey();
+ newSchema.addColumn(name, qualifier, entry.getValue().getTypeClass());
+ }
+ }
+ }
+ newSchema.setCharset(charset);
+ return newSchema;
+ }
+
+ /**
+ * Converts this {@link HBaseTableSchema} to {@link TableSchema}, the fields are consisted
+ * of families and rowkey, the order is in the definition order
+ * (i.e. calling {@link #addColumn(String, String, Class)} and {@link #setRowKey(String, Class)}).
+ * The family field is a composite type which is consisted of qualifiers.
+ *
+ * @return the {@link TableSchema} derived from the {@link HBaseTableSchema}.
+ */
+ TableSchema convertsToTableSchema() {
+ String[] familyNames = getFamilyNames();
+ if (rowKeyInfo != null) {
+ String[] fieldNames = new String[familyNames.length + 1];
+ TypeInformation<?>[] fieldTypes = new TypeInformation[familyNames.length + 1];
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (i == rowKeyInfo.rowKeyIndex) {
+ fieldNames[i] = rowKeyInfo.rowKeyName;
+ fieldTypes[i] = rowKeyInfo.rowKeyType;
+ } else {
+ int familyIndex = i < rowKeyInfo.rowKeyIndex ? i : i - 1;
+ String family = familyNames[familyIndex];
+ fieldNames[i] = family;
+ fieldTypes[i] = new RowTypeInfo(getQualifierTypes(family), getQualifierNames(family));
+ }
+ }
+ return new TableSchema(fieldNames, fieldTypes);
+ } else {
+ String[] fieldNames = new String[familyNames.length];
+ TypeInformation<?>[] fieldTypes = new TypeInformation[familyNames.length];
+ for (int i = 0; i < fieldNames.length; i++) {
+ String family = familyNames[i];
+ fieldNames[i] = family;
+ fieldTypes[i] = new RowTypeInfo(getQualifierTypes(family), getQualifierNames(family));
+ }
+ return new TableSchema(fieldNames, fieldTypes);
+ }
+ }
+
+ // ------------------------------------------------------------------------------------
+
+ /**
+ * An class contains information about rowKey, such as rowKeyName, rowKeyType, rowKeyIndex.
+ */
+ private static class RowKeyInfo implements Serializable {
+ private static final long serialVersionUID = 1L;
+ final String rowKeyName;
+ final TypeInformation<?> rowKeyType;
+ final int rowKeyIndex;
+
+ RowKeyInfo(String rowKeyName, TypeInformation<?> rowKeyType, int rowKeyIndex) {
+ this.rowKeyName = rowKeyName;
+ this.rowKeyType = rowKeyType;
+ this.rowKeyIndex = rowKeyIndex;
+ }
+ }
}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
index c7745b0..3fff5fa 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
@@ -18,21 +18,25 @@
package org.apache.flink.addons.hbase;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.LookupableTableSource;
import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
-import java.util.Map;
-
/**
* Creates a TableSource to scan an HBase table.
*
@@ -46,22 +50,24 @@ import java.util.Map;
* <pre>
* {@code
* HBaseTableSource hSrc = new HBaseTableSource(conf, "hTable");
+ * hSrc.setRowKey("rowkey", String.class);
* hSrc.addColumn("fam1", "col1", byte[].class);
* hSrc.addColumn("fam1", "col2", Integer.class);
* hSrc.addColumn("fam2", "col1", String.class);
*
* tableEnv.registerTableSource("hTable", hSrc);
- * Table res = tableEnv.sqlQuery("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t GROUP BY t.fam2.col1");
+ * Table res = tableEnv.sqlQuery(
+ * "SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t " +
+ * "WHERE t.rowkey LIKE 'flink%' GROUP BY t.fam2.col1");
* }
* </pre>
- *
*/
-public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTableSource<Row> {
+public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTableSource<Row>, StreamTableSource<Row>, LookupableTableSource<Row> {
- private Configuration conf;
- private String tableName;
- private HBaseTableSchema hBaseSchema;
- private TableSchema tableSchema;
+ private final Configuration conf;
+ private final String tableName;
+ private final HBaseTableSchema hbaseSchema;
+ private final int[] projectFields;
/**
* The HBase configuration and the name of the table to read.
@@ -70,16 +76,14 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
* @param tableName the tableName
*/
public HBaseTableSource(Configuration conf, String tableName) {
- this.conf = conf;
- this.tableName = Preconditions.checkNotNull(tableName, "Table name");
- this.hBaseSchema = new HBaseTableSchema();
+ this(conf, tableName, new HBaseTableSchema(), null);
}
- private HBaseTableSource(Configuration conf, String tableName, TableSchema tableSchema) {
+ HBaseTableSource(Configuration conf, String tableName, HBaseTableSchema hbaseSchema, int[] projectFields) {
this.conf = conf;
this.tableName = Preconditions.checkNotNull(tableName, "Table name");
- this.hBaseSchema = new HBaseTableSchema();
- this.tableSchema = tableSchema;
+ this.hbaseSchema = hbaseSchema;
+ this.projectFields = projectFields;
}
/**
@@ -90,7 +94,16 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
* @param clazz the data type of the qualifier
*/
public void addColumn(String family, String qualifier, Class<?> clazz) {
- this.hBaseSchema.addColumn(family, qualifier, clazz);
+ this.hbaseSchema.addColumn(family, qualifier, clazz);
+ }
+
+ /**
+ * Sets row key information in the table schema.
+ * @param rowKeyName the row key field name
+ * @param clazz the data type of the row key
+ */
+ public void setRowKey(String rowKeyName, Class<?> clazz) {
+ this.hbaseSchema.setRowKey(rowKeyName, clazz);
}
/**
@@ -99,62 +112,79 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
* @param charset Name of the charset to use.
*/
public void setCharset(String charset) {
- this.hBaseSchema.setCharset(charset);
+ this.hbaseSchema.setCharset(charset);
}
@Override
public TypeInformation<Row> getReturnType() {
- return new RowTypeInfo(getFieldTypes(), getFieldNames());
+ HBaseTableSchema projectedSchema = hbaseSchema.getProjectedHBaseTableSchema(projectFields);
+ return projectedSchema.convertsToTableSchema().toRowType();
}
@Override
public TableSchema getTableSchema() {
- if (this.tableSchema == null) {
- return new TableSchema(getFieldNames(), getFieldTypes());
- } else {
- return this.tableSchema;
- }
+ return hbaseSchema.convertsToTableSchema();
}
- private String[] getFieldNames() {
- return hBaseSchema.getFamilyNames();
+ @Override
+ public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
+ HBaseTableSchema projectedSchema = hbaseSchema.getProjectedHBaseTableSchema(projectFields);
+ return execEnv
+ .createInput(new HBaseRowInputFormat(conf, tableName, projectedSchema), getReturnType())
+ .name(explainSource());
}
- private TypeInformation[] getFieldTypes() {
- String[] famNames = hBaseSchema.getFamilyNames();
- TypeInformation<?>[] fieldTypes = new TypeInformation[hBaseSchema.getFamilyNames().length];
- int i = 0;
- for (String family : famNames) {
- fieldTypes[i] = new RowTypeInfo(hBaseSchema.getQualifierTypes(family), hBaseSchema.getQualifierNames(family));
- i++;
- }
- return fieldTypes;
+ @Override
+ public HBaseTableSource projectFields(int[] fields) {
+ return new HBaseTableSource(this.conf, tableName, hbaseSchema, fields);
}
@Override
- public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
- return execEnv.createInput(new HBaseRowInputFormat(conf, tableName, hBaseSchema), getReturnType()).name(explainSource());
+ public String explainSource() {
+ return TableConnectorUtils.generateRuntimeName(this.getClass(), getTableSchema().getFieldNames());
}
@Override
- public HBaseTableSource projectFields(int[] fields) {
- String[] famNames = hBaseSchema.getFamilyNames();
- HBaseTableSource newTableSource = new HBaseTableSource(this.conf, tableName, getTableSchema().copy());
- // Extract the family from the given fields
- for (int field : fields) {
- String family = famNames[field];
- Map<String, TypeInformation<?>> familyInfo = hBaseSchema.getFamilyInfo(family);
- for (Map.Entry<String, TypeInformation<?>> entry : familyInfo.entrySet()) {
- // create the newSchema
- String qualifier = entry.getKey();
- newTableSource.addColumn(family, qualifier, entry.getValue().getTypeClass());
- }
- }
- return newTableSource;
+ public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
+ Preconditions.checkArgument(
+ null != lookupKeys && lookupKeys.length == 1,
+ "HBase table can only be retrieved by rowKey for now.");
+ Preconditions.checkState(
+ hbaseSchema.getRowKeyName().isPresent(),
+ "HBase schema must have a row key when used in lookup mode.");
+ Preconditions.checkState(
+ hbaseSchema.getRowKeyName().get().equals(lookupKeys[0]),
+ "The lookup key is not row key of HBase.");
+
+ return new HBaseLookupFunction(
+ this.conf,
+ this.tableName,
+ hbaseSchema.getProjectedHBaseTableSchema(projectFields));
}
@Override
- public String explainSource() {
- return TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames());
+ public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys) {
+ throw new UnsupportedOperationException("HBase table doesn't support async lookup currently.");
+ }
+
+ @Override
+ public boolean isAsyncEnabled() {
+ return false;
+ }
+
+ @Override
+ public boolean isBounded() {
+ // HBase source is always bounded.
+ return true;
+ }
+
+ @Override
+ public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
+ throw new UnsupportedOperationException("HBase table can not convert to DataStream currently.");
+ }
+
+ @VisibleForTesting
+ HBaseTableSchema getHBaseTableSchema() {
+ return this.hbaseSchema;
}
}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseConfigurationUtil.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseConfigurationUtil.java
new file mode 100644
index 0000000..ed5d345
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseConfigurationUtil.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * This class helps to do serialization for hadoop Configuration.
+ */
+@Internal
+public class HBaseConfigurationUtil {
+
+ /**
+ * Serialize a Hadoop {@link Configuration} into byte[].
+ */
+ public static byte[] serializeConfiguration(Configuration conf) {
+ try {
+ return serializeWritable(conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Encounter an IOException when serialize the Configuration.", e);
+ }
+ }
+
+ /**
+ * Deserialize a Hadoop {@link Configuration} from byte[].
+ * Deserialize configs to {@code targetConfig} if it is set.
+ */
+ public static Configuration deserializeConfiguration(byte[] serializedConfig, Configuration targetConfig) {
+ if (null == targetConfig) {
+ targetConfig = new Configuration();
+ }
+ try {
+ deserializeWritable(targetConfig, serializedConfig);
+ } catch (IOException e) {
+ throw new RuntimeException("Encounter an IOException when deserialize the Configuration.", e);
+ }
+ return targetConfig;
+ }
+
+ /**
+ * Serialize writable byte[].
+ *
+ * @param <T> the type parameter
+ * @param writable the writable
+ * @return the byte [ ]
+ * @throws IOException the io exception
+ */
+ private static <T extends Writable> byte[] serializeWritable(T writable) throws IOException {
+ Preconditions.checkArgument(writable != null);
+
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+ writable.write(outputStream);
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ /**
+ * Deserialize writable.
+ *
+ * @param <T> the type parameter
+ * @param writable the writable
+ * @param bytes the bytes
+ * @throws IOException the io exception
+ */
+ private static <T extends Writable> void deserializeWritable(T writable, byte[] bytes)
+ throws IOException {
+ Preconditions.checkArgument(writable != null);
+ Preconditions.checkArgument(bytes != null);
+
+ ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
+ DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
+ writable.readFields(dataInputStream);
+ }
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadHelper.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadHelper.java
new file mode 100644
index 0000000..1f8fac2
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadHelper.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.util;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+
+import java.nio.charset.Charset;
+
+/**
+ * A read helper for HBase. The helper can used to create a {@link Scan} and {@link Get}
+ * for scanning or lookuping a HBase table, and supports converting the HBase {@link Result}
+ * to Flink {@link Row}.
+ */
+public class HBaseReadHelper {
+
+ // family keys
+ private final byte[][] families;
+ // qualifier keys
+ private final byte[][][] qualifiers;
+ // qualifier types
+ private final int[][] qualifierTypes;
+
+ // row key index in output row
+ private final int rowKeyIndex;
+ // type of row key
+ private final int rowKeyType;
+
+ private final int fieldLength;
+ private final Charset charset;
+
+ // row which is returned
+ private Row resultRow;
+ // nested family rows
+ private Row[] familyRows;
+
+ public HBaseReadHelper(HBaseTableSchema hbaseTableSchema) {
+ this.families = hbaseTableSchema.getFamilyKeys();
+ this.qualifiers = new byte[this.families.length][][];
+ this.qualifierTypes = new int[this.families.length][];
+ this.familyRows = new Row[this.families.length];
+ String[] familyNames = hbaseTableSchema.getFamilyNames();
+ for (int f = 0; f < families.length; f++) {
+ this.qualifiers[f] = hbaseTableSchema.getQualifierKeys(familyNames[f]);
+ TypeInformation[] typeInfos = hbaseTableSchema.getQualifierTypes(familyNames[f]);
+ this.qualifierTypes[f] = new int[typeInfos.length];
+ for (int i = 0; i < typeInfos.length; i++) {
+ qualifierTypes[f][i] = HBaseTypeUtils.getTypeIndex(typeInfos[i]);
+ }
+ this.familyRows[f] = new Row(typeInfos.length);
+ }
+ this.charset = Charset.forName(hbaseTableSchema.getStringCharset());
+ // row key
+ this.rowKeyIndex = hbaseTableSchema.getRowKeyIndex();
+ this.rowKeyType = hbaseTableSchema.getRowKeyTypeInfo()
+ .map(HBaseTypeUtils::getTypeIndex)
+ .orElse(-1);
+
+ // field length need take row key into account if it exists.
+ this.fieldLength = rowKeyIndex == -1 ? families.length : families.length + 1;
+
+ // prepare output rows
+ this.resultRow = new Row(fieldLength);
+ }
+
+ /**
+ * Returns an instance of Get that retrieves the matches records from the HBase table.
+ *
+ * @return The appropriate instance of Get for this use case.
+ */
+ public Get createGet(Object rowKey) {
+ byte[] rowkey = HBaseTypeUtils.serializeFromObject(
+ rowKey,
+ rowKeyType,
+ charset);
+ Get get = new Get(rowkey);
+ for (int f = 0; f < families.length; f++) {
+ byte[] family = families[f];
+ for (byte[] qualifier : qualifiers[f]) {
+ get.addColumn(family, qualifier);
+ }
+ }
+ return get;
+ }
+
+ /**
+ * Returns an instance of Scan that retrieves the required subset of records from the HBase table.
+ *
+ * @return The appropriate instance of Scan for this use case.
+ */
+ public Scan createScan() {
+ Scan scan = new Scan();
+ for (int f = 0; f < families.length; f++) {
+ byte[] family = families[f];
+ for (int q = 0; q < qualifiers[f].length; q++) {
+ byte[] quantifier = qualifiers[f][q];
+ scan.addColumn(family, quantifier);
+ }
+ }
+ return scan;
+ }
+
+ /**
+ * Parses HBase {@link Result} into {@link Row}.
+ */
+ public Row parseToRow(Result result) {
+ if (rowKeyIndex == -1) {
+ return parseToRow(result, null);
+ } else {
+ Object rowkey = HBaseTypeUtils.deserializeToObject(result.getRow(), rowKeyType, charset);
+ return parseToRow(result, rowkey);
+ }
+ }
+
+ /**
+ * Parses HBase {@link Result} into {@link Row}.
+ */
+ public Row parseToRow(Result result, Object rowKey) {
+ for (int i = 0; i < fieldLength; i++) {
+ if (rowKeyIndex == i) {
+ resultRow.setField(rowKeyIndex, rowKey);
+ } else {
+ int f = (rowKeyIndex != -1 && i > rowKeyIndex) ? i - 1 : i;
+ // get family key
+ byte[] familyKey = families[f];
+ Row familyRow = familyRows[f];
+ for (int q = 0; q < this.qualifiers[f].length; q++) {
+ // get quantifier key
+ byte[] qualifier = qualifiers[f][q];
+ // get quantifier type idx
+ int typeIdx = qualifierTypes[f][q];
+ // read value
+ byte[] value = result.getValue(familyKey, qualifier);
+ if (value != null) {
+ familyRow.setField(q, HBaseTypeUtils.deserializeToObject(value, typeIdx, charset));
+ } else {
+ familyRow.setField(q, null);
+ }
+ }
+ resultRow.setField(i, familyRow);
+ }
+ }
+ return resultRow;
+ }
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java
new file mode 100644
index 0000000..83c9eb2
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.apache.commons.net.ntp.TimeStamp;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * A utility class to process data exchange with HBase type system.
+ */
+@Internal
+public class HBaseTypeUtils {
+
+ private static final byte[] EMPTY_BYTES = new byte[]{};
+
+ /**
+ * Deserialize byte array to Java Object with the given type.
+ */
+ public static Object deserializeToObject(byte[] value, int typeIdx, Charset stringCharset) {
+ switch (typeIdx) {
+ case 0: // byte[]
+ return value;
+ case 1: // String
+ return new String(value, stringCharset);
+ case 2: // byte
+ return value[0];
+ case 3:
+ return Bytes.toShort(value);
+ case 4:
+ return Bytes.toInt(value);
+ case 5:
+ return Bytes.toLong(value);
+ case 6:
+ return Bytes.toFloat(value);
+ case 7:
+ return Bytes.toDouble(value);
+ case 8:
+ return Bytes.toBoolean(value);
+ case 9: // sql.Timestamp encoded as long
+ return new Timestamp(Bytes.toLong(value));
+ case 10: // sql.Date encoded as long
+ return new Date(Bytes.toLong(value));
+ case 11: // sql.Time encoded as long
+ return new Time(Bytes.toLong(value));
+ case 12:
+ return Bytes.toBigDecimal(value);
+ case 13:
+ return new BigInteger(value);
+
+ default:
+ throw new IllegalArgumentException("unsupported type index:" + typeIdx);
+ }
+ }
+
+ /**
+ * Serialize the Java Object to byte array with the given type.
+ */
+ public static byte[] serializeFromObject(Object value, int typeIdx, Charset stringCharset) {
+ switch (typeIdx) {
+ case 0: // byte[]
+ return (byte[]) value;
+ case 1: // external String
+ return value == null ? EMPTY_BYTES : ((String) value).getBytes(stringCharset);
+ case 2: // byte
+ return value == null ? EMPTY_BYTES : new byte[]{(byte) value};
+ case 3:
+ return Bytes.toBytes((short) value);
+ case 4:
+ return Bytes.toBytes((int) value);
+ case 5:
+ return Bytes.toBytes((long) value);
+ case 6:
+ return Bytes.toBytes((float) value);
+ case 7:
+ return Bytes.toBytes((double) value);
+ case 8:
+ return Bytes.toBytes((boolean) value);
+ case 9: // sql.Timestamp encoded to Long
+ return Bytes.toBytes(((TimeStamp) value).getTime());
+ case 10: // sql.Date encoded as long
+ return Bytes.toBytes(((Date) value).getTime());
+ case 11: // sql.Time encoded as long
+ return Bytes.toBytes(((Time) value).getTime());
+ case 12:
+ return Bytes.toBytes((BigDecimal) value);
+ case 13:
+ return ((BigInteger) value).toByteArray();
+
+ default:
+ throw new IllegalArgumentException("unsupported type index:" + typeIdx);
+ }
+ }
+
+ /**
+ * Gets the type index (type representation in HBase connector) from the {@link TypeInformation}.
+ */
+ public static int getTypeIndex(TypeInformation typeInfo) {
+ return getTypeIndex(typeInfo.getTypeClass());
+ }
+
+ /**
+ * Checks whether the given Class is a supported type in HBase connector.
+ */
+ public static boolean isSupportedType(Class<?> clazz) {
+ return getTypeIndex(clazz) != -1;
+ }
+
+ private static int getTypeIndex(Class<?> clazz) {
+ if (byte[].class.equals(clazz)) {
+ return 0;
+ } else if (String.class.equals(clazz)) {
+ return 1;
+ } else if (Byte.class.equals(clazz)) {
+ return 2;
+ } else if (Short.class.equals(clazz)) {
+ return 3;
+ } else if (Integer.class.equals(clazz)) {
+ return 4;
+ } else if (Long.class.equals(clazz)) {
+ return 5;
+ } else if (Float.class.equals(clazz)) {
+ return 6;
+ } else if (Double.class.equals(clazz)) {
+ return 7;
+ } else if (Boolean.class.equals(clazz)) {
+ return 8;
+ } else if (Timestamp.class.equals(clazz)) {
+ return 9;
+ } else if (Date.class.equals(clazz)) {
+ return 10;
+ } else if (Time.class.equals(clazz)) {
+ return 11;
+ } else if (BigDecimal.class.equals(clazz)) {
+ return 12;
+ } else if (BigInteger.class.equals(clazz)) {
+ return 13;
+ } else {
+ return -1;
+ }
+ }
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/table/descriptors/HBaseValidator.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/table/descriptors/HBaseValidator.java
new file mode 100644
index 0000000..fd844e9
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/table/descriptors/HBaseValidator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The validator for HBase.
+ * More features to be supported, e.g., batch read/write, async api(support from hbase version 2.0.0), Caching for LookupFunction.
+ */
+@Internal
+public class HBaseValidator extends ConnectorDescriptorValidator {
+
+ public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
+ public static final String CONNECTOR_VERSION_VALUE_143 = "1.4.3";
+ public static final String CONNECTOR_HBASE_TABLE_NAME = "connector.table-name";
+ public static final String CONNECTOR_HBASE_ZK_QUORUM = "connector.zookeeper.quorum";
+
+ @Override
+ public void validate(DescriptorProperties properties) {
+ super.validate(properties);
+ properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE, false);
+ properties.validateString(CONNECTOR_HBASE_TABLE_NAME, false, 1);
+ properties.validateString(CONNECTOR_HBASE_ZK_QUORUM, false, 1);
+ validateVersion(properties);
+ }
+
+ private void validateVersion(DescriptorProperties properties) {
+ final List<String> versions = Arrays.asList(CONNECTOR_VERSION_VALUE_143);
+ properties.validateEnumValues(CONNECTOR_VERSION, false, versions);
+ }
+}
diff --git a/flink-connectors/flink-hbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-hbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000..15cffd6
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.addons.hbase.HBaseTableFactory
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
new file mode 100644
index 0000000..a0822a3
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_TABLE_NAME;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_ZK_QUORUM;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/**
+ * IT case Test HBaseLookupFunction.
+ */
+public class HBaseLookupFunctionITCase extends HBaseTestingClusterAutostarter {
+ private static final String ROWKEY = "rk";
+ private static final String FAMILY1 = "family1";
+ private static final String F1COL1 = "col1";
+
+ private static final String FAMILY2 = "family2";
+ private static final String F2COL1 = "col1";
+ private static final String F2COL2 = "col2";
+
+ private static final String FAMILY3 = "family3";
+ private static final String F3COL1 = "col1";
+ private static final String F3COL2 = "col2";
+ private static final String F3COL3 = "col3";
+
+ private static final String HTABLE_NAME = "testSrcHBaseTable1";
+
+ // prepare a source collection.
+ private static final List<Row> testData1 = new ArrayList<>();
+
+ static {
+ testData1.add(Row.of(1, 1L, "Hi"));
+ testData1.add(Row.of(2, 2L, "Hello"));
+ testData1.add(Row.of(3, 2L, "Hello world"));
+ testData1.add(Row.of(3, 3L, "Hello world!"));
+ }
+
+ private static final TypeInformation<?>[] testTypes1 = {BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};
+ private static final String[] testColumns1 = {"a", "b", "c"};
+ private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(testTypes1, testColumns1);
+
+ @BeforeClass
+ public static void activateHBaseCluster() throws IOException {
+ registerHBaseMiniClusterInClasspath();
+ prepareHBaseTableWithData();
+ }
+
+ private static void prepareHBaseTableWithData() throws IOException {
+ // create a table
+ TableName tableName = TableName.valueOf(HTABLE_NAME);
+ // column families
+ byte[][] families = new byte[][]{Bytes.toBytes(FAMILY1), Bytes.toBytes(FAMILY2), Bytes.toBytes(FAMILY3)};
+ // split keys
+ byte[][] splitKeys = new byte[][]{Bytes.toBytes(4)};
+ createTable(tableName, families, splitKeys);
+
+ // get the HTable instance
+ HTable table = openTable(tableName);
+ List<Put> puts = new ArrayList<>();
+ // add some data
+ puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
+ puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
+ puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
+ puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
+ puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
+ puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
+ puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
+ puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
+
+ // append rows to table
+ table.put(puts);
+ table.close();
+ }
+
+ private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) {
+ Put put = new Put(Bytes.toBytes(rowKey));
+ // family 1
+ put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), Bytes.toBytes(f1c1));
+ // family 2
+ if (f2c1 != null) {
+ put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1));
+ }
+ put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), Bytes.toBytes(f2c2));
+ // family 3
+ put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), Bytes.toBytes(f3c1));
+ put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), Bytes.toBytes(f3c2));
+ put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), Bytes.toBytes(f3c3));
+
+ return put;
+ }
+
+ private static Map<String, String> hbaseTableProperties() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE);
+ properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
+ properties.put(CONNECTOR_PROPERTY_VERSION, "1");
+ properties.put(CONNECTOR_HBASE_TABLE_NAME, HTABLE_NAME);
+ // get zk quorum from "hbase-site.xml" in classpath
+ String hbaseZk = HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM);
+ properties.put(CONNECTOR_HBASE_ZK_QUORUM, hbaseZk);
+ // schema
+ String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
+ TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT);
+ TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, F2COL2}, Types.STRING, Types.LONG);
+ TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING);
+ TypeInformation[] columnTypes = new TypeInformation[]{f1, Types.INT, f2, f3};
+
+ DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+ TableSchema tableSchema = new TableSchema(columnNames, columnTypes);
+ descriptorProperties.putTableSchema(SCHEMA, tableSchema);
+ descriptorProperties.putProperties(properties);
+ return descriptorProperties.asMap();
+ }
+
+ @Test
+ public void testHBaseLookupFunction() throws Exception {
+ StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ streamEnv.setParallelism(4);
+ StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv);
+ StreamITCase.clear();
+
+ // prepare a source table
+ String srcTableName = "testStreamSrcTable1";
+ DataStream<Row> ds = streamEnv.fromCollection(testData1).returns(testTypeInfo1);
+ Table in = streamTableEnv.fromDataStream(ds, String.join(",", testColumns1));
+ streamTableEnv.registerTable(srcTableName, in);
+
+ Map<String, String> tableProperties = hbaseTableProperties();
+ TableSource source = TableFactoryService
+ .find(HBaseTableFactory.class, tableProperties)
+ .createTableSource(tableProperties);
+
+ streamTableEnv.registerFunction("hbaseLookup", ((HBaseTableSource) source).getLookupFunction(new String[]{ROWKEY}));
+
+ // perform a temporal table join query
+ String sqlQuery = "SELECT a,family1.col1, family3.col3 FROM testStreamSrcTable1, LATERAL TABLE(hbaseLookup(a))";
+ Table result = streamTableEnv.sqlQuery(sqlQuery);
+
+ DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class);
+ resultSet.addSink(new StreamITCase.StringSink<>());
+
+ streamEnv.execute();
+
+ List<String> expected = new ArrayList<>();
+ expected.add("1,10,Welt-1");
+ expected.add("2,20,Welt-2");
+ expected.add("3,30,Welt-3");
+ expected.add("3,30,Welt-3");
+
+ StreamITCase.compareWithList(expected);
+ }
+}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTableFactoryTest.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTableFactoryTest.java
new file mode 100644
index 0000000..4061c0e
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTableFactoryTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_TABLE_NAME;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_ZK_QUORUM;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/**
+ * UT for HBaseTableFactory.
+ */
+public class HBaseTableFactoryTest {
+ private static final String FAMILY1 = "f1";
+ private static final String FAMILY2 = "f2";
+ private static final String FAMILY3 = "f3";
+ private static final String COL1 = "c1";
+ private static final String COL2 = "c2";
+ private static final String COL3 = "c3";
+ private static final String ROWKEY = "rowkey";
+
+ private DescriptorProperties createDescriptor(String[] columnNames, TypeInformation[] columnTypes) {
+ TableSchema tableSchema = new TableSchema(columnNames, columnTypes);
+
+ Map<String, String> tableServiceLookupConf = new HashMap<>();
+ tableServiceLookupConf.put(CONNECTOR_TYPE, "hbase");
+ tableServiceLookupConf.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
+ tableServiceLookupConf.put(CONNECTOR_PROPERTY_VERSION, "1");
+ tableServiceLookupConf.put(CONNECTOR_HBASE_TABLE_NAME, "testHBastTable");
+ tableServiceLookupConf.put(CONNECTOR_HBASE_ZK_QUORUM, "localhost:2181");
+
+ DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+ descriptorProperties.putTableSchema(SCHEMA, tableSchema);
+ descriptorProperties.putProperties(tableServiceLookupConf);
+ return descriptorProperties;
+ }
+
+ @Test
+ public void testConstructorForNestedSchema() {
+ String[] columnNames = {FAMILY1, FAMILY2, ROWKEY, FAMILY3};
+
+ TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{COL1}, Types.INT);
+ TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{COL1, COL2}, Types.INT, Types.LONG);
+ TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{COL1, COL2, COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING);
+ TypeInformation[] columnTypes = new TypeInformation[]{f1, f2, Types.LONG, f3};
+
+ DescriptorProperties descriptorProperties = createDescriptor(columnNames, columnTypes);
+ TableSource source = TableFactoryService.find(HBaseTableFactory.class,
+ descriptorProperties.asMap()).createTableSource(descriptorProperties.asMap());
+ Assert.assertTrue(source instanceof HBaseTableSource);
+ TableFunction<Row> tableFunction = ((HBaseTableSource) source).getLookupFunction(new String[]{ROWKEY});
+ Assert.assertTrue(tableFunction instanceof HBaseLookupFunction);
+ Assert.assertEquals("testHBastTable", ((HBaseLookupFunction) tableFunction).getHTableName());
+
+ HBaseTableSchema hbaseSchema = ((HBaseTableSource) source).getHBaseTableSchema();
+ Assert.assertEquals(2, hbaseSchema.getRowKeyIndex());
+ Assert.assertEquals(Optional.of(Types.LONG), hbaseSchema.getRowKeyTypeInfo());
+
+ Assert.assertArrayEquals(new String[]{"f1", "f2", "f3"}, hbaseSchema.getFamilyNames());
+ Assert.assertArrayEquals(new String[]{"c1"}, hbaseSchema.getQualifierNames("f1"));
+ Assert.assertArrayEquals(new String[]{"c1", "c2"}, hbaseSchema.getQualifierNames("f2"));
+ Assert.assertArrayEquals(new String[]{"c1", "c2", "c3"}, hbaseSchema.getQualifierNames("f3"));
+
+ Assert.assertArrayEquals(new TypeInformation[]{Types.INT}, hbaseSchema.getQualifierTypes("f1"));
+ Assert.assertArrayEquals(new TypeInformation[]{Types.INT, Types.LONG}, hbaseSchema.getQualifierTypes("f2"));
+ Assert.assertArrayEquals(new TypeInformation[]{Types.DOUBLE, Types.BOOLEAN, Types.STRING}, hbaseSchema.getQualifierTypes("f3"));
+ }
+}