You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/05/11 12:23:30 UTC

[incubator-inlong] branch master updated: [INLONG-4156][Sort] Add HBase load node (#4158)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fc162105a [INLONG-4156][Sort] Add HBase load node (#4158)
fc162105a is described below

commit fc162105a6dfcd8dd27d2d41ee6347bf72f5b747
Author: pacino <ge...@gmail.com>
AuthorDate: Wed May 11 20:23:24 2022 +0800

    [INLONG-4156][Sort] Add HBase load node (#4158)
---
 .../sort/protocol/constant/HBaseConstant.java      |  42 +++++++
 .../apache/inlong/sort/protocol/node/LoadNode.java |   4 +-
 .../org/apache/inlong/sort/protocol/node/Node.java |  10 +-
 .../sort/protocol/node/load/HbaseLoadNode.java     | 128 +++++++++++++++++++++
 .../sort/protocol/node/load/HbaseLoadNodeTest.java |  38 ++++++
 inlong-sort/sort-single-tenant/pom.xml             |   5 +
 .../flink/parser/impl/FlinkSqlParser.java          |  82 ++++++++++++-
 .../flink/parser/HbaseLoadFlinkSqlParseTest.java   | 122 ++++++++++++++++++++
 licenses/inlong-sort/LICENSE                       |   1 +
 licenses/inlong-sort/NOTICE                        |  15 +++
 pom.xml                                            |   5 +
 11 files changed, 442 insertions(+), 10 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/HBaseConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/HBaseConstant.java
new file mode 100644
index 000000000..a16b9187d
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/HBaseConstant.java
@@ -0,0 +1,42 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.constant;
+
+/**
+ * hbase option constant
+ */
+public class HBaseConstant {
+
+    public static final String CONNECTOR = "connector";
+
+    public static final String HBASE_2 = "hbase-2.2";
+
+    public static final String TABLE_NAME = "table-name";
+
+    public static final String ZOOKEEPER_QUORUM = "zookeeper.quorum";
+
+    public static final String ZOOKEEPER_ZNODE_PARENT = "zookeeper.znode.parent";
+
+    public static final String SINK_BUFFER_FLUSH_MAX_SIZE = "sink.buffer-flush.max-size";
+
+    public static final String SINK_BUFFER_FLUSH_MAX_ROWS = "sink.buffer-flush.max-rows";
+
+    public static final String SINK_BUFFER_FLUSH_INTERVAL = "sink.buffer-flush.interval";
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index a87c5b7a4..89217573c 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -27,6 +27,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
@@ -43,7 +44,8 @@ import java.util.Map;
         property = "type")
 @JsonSubTypes({
         @JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"),
-        @JsonSubTypes.Type(value = HiveLoadNode.class, name = "hiveLoad")
+        @JsonSubTypes.Type(value = HiveLoadNode.class, name = "hiveLoad"),
+        @JsonSubTypes.Type(value = HbaseLoadNode.class, name = "hbaseLoad")
 })
 @NoArgsConstructor
 @Data
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index 210994ba8..4873f7725 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -17,9 +17,6 @@
 
 package org.apache.inlong.sort.protocol.node;
 
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
@@ -32,6 +29,10 @@ import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
 import org.apache.inlong.sort.protocol.node.transform.TransformNode;
 
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
 @JsonTypeInfo(
         use = JsonTypeInfo.Id.NAME,
         include = JsonTypeInfo.As.PROPERTY,
@@ -42,7 +43,8 @@ import org.apache.inlong.sort.protocol.node.transform.TransformNode;
         @JsonSubTypes.Type(value = TransformNode.class, name = "baseTransform"),
         @JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"),
         @JsonSubTypes.Type(value = DistinctNode.class, name = "distinct"),
-        @JsonSubTypes.Type(value = HiveLoadNode.class, name = "hiveLoad")
+        @JsonSubTypes.Type(value = HiveLoadNode.class, name = "hiveLoad"),
+        @JsonSubTypes.Type(value = HiveLoadNode.class, name = "hbaseLoad")
 })
 public interface Node {
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java
new file mode 100644
index 000000000..3841454d9
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java
@@ -0,0 +1,128 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.HBaseConstant;
+import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hbase load node for generate hbase connector DDL
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("hbaseLoad")
+@Data
+@NoArgsConstructor
+public class HbaseLoadNode extends LoadNode implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @JsonProperty("tableName")
+    private String tableName;
+
+    @JsonProperty("nameSpace")
+    private String nameSpace;
+
+    @JsonProperty("zookeeperQuorum")
+    private String zookeeperQuorum;
+
+    @JsonProperty("rowKey")
+    private String rowKey;
+
+    @JsonProperty("sinkBufferFlushMaxSize")
+    private String sinkBufferFlushMaxSize;
+
+    @JsonProperty("zookeeperZnodeParent")
+    private String zookeeperZnodeParent;
+
+    @JsonProperty("sinkBufferFlushMaxRows")
+    private String sinkBufferFlushMaxRows;
+
+    @JsonProperty("sinkBufferFlushInterval")
+    private String sinkBufferFlushInterval;
+
+    @JsonCreator
+    public HbaseLoadNode(@JsonProperty("id") String id,
+            @JsonProperty("name") String name,
+            @JsonProperty("fields") List<FieldInfo> fields,
+            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("filters") List<FilterFunction> filters,
+            @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+            @JsonProperty("sinkParallelism") Integer sinkParallelism,
+            @JsonProperty("properties") Map<String, String> properties,
+            @JsonProperty("tableName") String tableName,
+            @JsonProperty("nameSpace") String nameSpace,
+            @JsonProperty("zookeeperQuorum") String zookeeperQuorum,
+            @JsonProperty("rowKey") String rowKey,
+            @JsonProperty("sinkBufferFlushMaxSize") String sinkBufferFlushMaxSize,
+            @JsonProperty("zookeeperZnodeParent") String zookeeperZnodeParent,
+            @JsonProperty("sinkBufferFlushMaxRows") String sinkBufferFlushMaxRows,
+            @JsonProperty("sinkBufferFlushInterval") String sinkBufferFlushInterval) {
+        super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+        this.tableName = Preconditions.checkNotNull(tableName, "tableName of hbase is null");
+        this.nameSpace = Preconditions.checkNotNull(nameSpace, "nameSpace of hbase is null");
+        this.zookeeperQuorum = Preconditions.checkNotNull(zookeeperQuorum, "zookeeperQuorum of hbase is null");
+        this.rowKey = Preconditions.checkNotNull(rowKey, "rowKey of hbase is null");
+        this.sinkBufferFlushMaxSize = sinkBufferFlushMaxSize;
+        this.zookeeperZnodeParent = zookeeperZnodeParent;
+        this.sinkBufferFlushMaxRows = sinkBufferFlushMaxRows;
+        this.sinkBufferFlushInterval = sinkBufferFlushInterval;
+    }
+
+    @Override
+    public Map<String, String> tableOptions() {
+        Map<String, String> map = super.tableOptions();
+        map.put(HBaseConstant.CONNECTOR, HBaseConstant.HBASE_2);
+        map.put(HBaseConstant.TABLE_NAME, nameSpace + ":" + tableName);
+        map.put(HBaseConstant.ZOOKEEPER_QUORUM, zookeeperQuorum);
+        if (StringUtils.isNotEmpty(sinkBufferFlushInterval)) {
+            map.put(HBaseConstant.SINK_BUFFER_FLUSH_INTERVAL, sinkBufferFlushInterval);
+        }
+        if (StringUtils.isNotEmpty(zookeeperZnodeParent)) {
+            map.put(HBaseConstant.ZOOKEEPER_ZNODE_PARENT, zookeeperZnodeParent);
+        }
+        if (StringUtils.isNotEmpty(sinkBufferFlushMaxRows)) {
+            map.put(HBaseConstant.SINK_BUFFER_FLUSH_MAX_ROWS, sinkBufferFlushMaxRows);
+        }
+        if (StringUtils.isNotEmpty(sinkBufferFlushMaxSize)) {
+            map.put(HBaseConstant.SINK_BUFFER_FLUSH_MAX_SIZE, sinkBufferFlushMaxSize);
+        }
+        return map;
+    }
+
+    @Override
+    public String genTableName() {
+        return this.tableName;
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
new file mode 100644
index 000000000..483d26757
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
@@ -0,0 +1,38 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+
+import java.util.Arrays;
+
+public class HbaseLoadNodeTest extends SerializeBaseTest<HbaseLoadNode> {
+
+    @Override
+    public HbaseLoadNode getTestObject() {
+        return new HbaseLoadNode("2", "test_hbase",
+                Arrays.asList(new FieldInfo("cf:id", new StringFormatInfo())),
+                Arrays.asList(new FieldRelationShip(new FieldInfo("id", new StringFormatInfo()),
+                        new FieldInfo("cf:id", new StringFormatInfo()))), null, null, 1, null, "mytable", "default",
+                "localhost:2181", "MD5(`id`)", null, null, null, null);
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index d9a7a4af7..13fb179a4 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -282,6 +282,11 @@
             <version>${debezium-core.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-hbase-2.2_${flink.scala.binary.version}</artifactId>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
index 07a7822ae..2a15caaec 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
@@ -32,6 +32,7 @@ import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
 import org.apache.inlong.sort.protocol.node.transform.TransformNode;
@@ -493,16 +494,40 @@ public class FlinkSqlParser implements Parser {
         StringBuilder sb = new StringBuilder();
         sb.append("INSERT INTO `").append(loadNode.genTableName()).append("` ");
         sb.append("\n    SELECT ");
-        Map<String, FieldRelationShip> fieldRelationMap = new HashMap<>(loadNode.getFieldRelationShips().size());
-        loadNode.getFieldRelationShips().forEach(s -> {
-            fieldRelationMap.put(s.getOutputField().getName(), s);
-        });
-        parseFieldRelations(loadNode.getFields(), fieldRelationMap, sb);
+        if (loadNode instanceof HbaseLoadNode) {
+            parseHbaseLoadFieldRelation((HbaseLoadNode) loadNode, sb);
+        } else {
+            Map<String, FieldRelationShip> fieldRelationMap = new HashMap<>(loadNode.getFieldRelationShips().size());
+            loadNode.getFieldRelationShips().forEach(s -> {
+                fieldRelationMap.put(s.getOutputField().getName(), s);
+            });
+            parseFieldRelations(loadNode.getFields(), fieldRelationMap, sb);
+        }
         sb.append("\n    FROM `").append(inputNode.genTableName()).append("`");
         parseFilterFields(loadNode.getFilterStrategy(), loadNode.getFilters(), sb);
         return sb.toString();
     }
 
+    private void parseHbaseLoadFieldRelation(HbaseLoadNode hbaseLoadNode, StringBuilder sb) {
+        sb.append(hbaseLoadNode.getRowKey()).append(" as rowkey,\n");
+        List<FieldRelationShip> fieldRelationShips = hbaseLoadNode.getFieldRelationShips();
+        Map<String, List<FieldRelationShip>> columnFamilyMapFields = genColumnFamilyMapFieldRelationShips(
+                fieldRelationShips);
+        for (Map.Entry<String, List<FieldRelationShip>> entry : columnFamilyMapFields.entrySet()) {
+            StringBuilder fieldAppend = new StringBuilder(" ROW(");
+            for (FieldRelationShip fieldRelationShip : entry.getValue()) {
+                FieldInfo fieldInfo = (FieldInfo) fieldRelationShip.getInputField();
+                fieldAppend.append(fieldInfo.getName()).append(",");
+            }
+            if (fieldAppend.length() > 0) {
+                fieldAppend.delete(fieldAppend.lastIndexOf(","), fieldAppend.length());
+            }
+            fieldAppend.append("),");
+            sb.append(fieldAppend);
+        }
+        sb.delete(sb.lastIndexOf(","), sb.length());
+    }
+
     /**
      * Generate create sql
      *
@@ -513,6 +538,9 @@ public class FlinkSqlParser implements Parser {
         if (node instanceof TransformNode) {
             return genCreateTransformSql(node);
         }
+        if (node instanceof HbaseLoadNode) {
+            return genCreateHbaseLoadSql((HbaseLoadNode) node);
+        }
         StringBuilder sb = new StringBuilder("CREATE TABLE `");
         sb.append(node.genTableName()).append("`(\n");
         sb.append(genPrimaryKey(node.getPrimaryKey()));
@@ -532,6 +560,50 @@ public class FlinkSqlParser implements Parser {
         return sb.toString();
     }
 
+    /**
+     * gen create table DDL for hbase load
+     *
+     * @param node
+     * @return
+     */
+    private String genCreateHbaseLoadSql(HbaseLoadNode node) {
+        StringBuilder sb = new StringBuilder("CREATE TABLE `");
+        sb.append(node.genTableName()).append("`(\n");
+        sb.append("rowkey STRING,\n");
+        List<FieldRelationShip> fieldRelationShips = node.getFieldRelationShips();
+        Map<String, List<FieldRelationShip>> columnFamilyMapFields = genColumnFamilyMapFieldRelationShips(
+                fieldRelationShips);
+        for (Map.Entry<String, List<FieldRelationShip>> entry : columnFamilyMapFields.entrySet()) {
+            sb.append(entry.getKey());
+            StringBuilder fieldsAppend = new StringBuilder(" Row<");
+            for (FieldRelationShip fieldRelationShip : entry.getValue()) {
+                FieldInfo fieldInfo = fieldRelationShip.getOutputField();
+                fieldsAppend.append(fieldInfo.getName().split(":")[1]).append(" ")
+                        .append(TableFormatUtils.deriveLogicalType(fieldInfo.getFormatInfo()).asSummaryString())
+                        .append(",");
+            }
+            if (fieldsAppend.length() > 0) {
+                fieldsAppend.delete(fieldsAppend.lastIndexOf(","), fieldsAppend.length());
+                fieldsAppend.append(">,\n");
+            }
+            sb.append(fieldsAppend);
+        }
+        sb.append("PRIMARY KEY (rowkey) NOT ENFORCED\n) ");
+        sb.append(parseOptions(node.tableOptions()));
+        return sb.toString();
+    }
+
+    private Map<String, List<FieldRelationShip>> genColumnFamilyMapFieldRelationShips(
+            List<FieldRelationShip> fieldRelationShips) {
+        Map<String, List<FieldRelationShip>> columnFamilyMapFields = new HashMap<>(16);
+        for (FieldRelationShip fieldRelationShip : fieldRelationShips) {
+            String columnFamily = fieldRelationShip.getOutputField().getName().split(":")[0];
+            columnFamilyMapFields.computeIfAbsent(columnFamily, v -> new ArrayList<>())
+                    .add(fieldRelationShip);
+        }
+        return columnFamilyMapFields;
+    }
+
     /**
      * Genrate create transform sql
      *
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/HbaseLoadFlinkSqlParseTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/HbaseLoadFlinkSqlParseTest.java
new file mode 100644
index 000000000..b2c2281e7
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/HbaseLoadFlinkSqlParseTest.java
@@ -0,0 +1,122 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.singletenant.flink.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.singletenant.flink.parser.result.FlinkSqlParseResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Test for {@link HbaseLoadNode}
+ */
+public class HbaseLoadFlinkSqlParseTest extends AbstractTestBase {
+
+    /**
+     * build mysql extract node
+     * @return Mysql extract node
+     */
+    private MySqlExtractNode buildMySQLExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("age", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()));
+        Map<String, String> map = new HashMap<>();
+        return new MySqlExtractNode("1", "mysql_input", fields,
+                null, map, null,
+                Collections.singletonList("user"), "localhost", "root", "inlong",
+                "test", null, null,
+                false, null);
+    }
+
+    /**
+     * build hbase load node
+     * @return hbase load node
+     */
+    private HbaseLoadNode buildHbaseLoadNode() {
+        return new HbaseLoadNode("2", "test_hbase",
+                Arrays.asList(new FieldInfo("cf:age", new LongFormatInfo()), new FieldInfo("cf:name",
+                        new StringFormatInfo())),
+                Arrays.asList(new FieldRelationShip(new FieldInfo("age", new LongFormatInfo()),
+                                new FieldInfo("cf:age", new LongFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("cf:name", new StringFormatInfo()))), null, null, 1, null, "mytable",
+                "default",
+                "localhost:2181", "MD5(`name`)", null, null, null, null);
+    }
+
+    /**
+     * build node relation
+     * @param inputs extract node
+     * @param outputs load node
+     * @return node relation
+     */
+    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelationShip(inputIds, outputIds);
+    }
+
+    /**
+     * Test flink sql task for extract is mysql {@link MySqlExtractNode} and load is hbase {@link HbaseLoadNode}
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testFlinkSqlParse() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildMySQLExtractNode();
+        Node outputNode = buildHbaseLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
+                Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        FlinkSqlParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+
+}
diff --git a/licenses/inlong-sort/LICENSE b/licenses/inlong-sort/LICENSE
index da4f61f3f..d314db5f0 100644
--- a/licenses/inlong-sort/LICENSE
+++ b/licenses/inlong-sort/LICENSE
@@ -423,6 +423,7 @@ The text of each license is the standard Apache 2.0 license.
   org.apache.flink:flink-connector-base:1.13.5 - Flink : Connectors : Base (https://github.com/apache/flink/tree/release-1.13.5/flink-connectors/flink-connector-base), (The Apache Software License, Version 2.0)
   org.apache.flink:flink-connector-files:1.13.5 - Flink : Connectors : Files (https://github.com/apache/flink/tree/release-1.13.5/flink-connectors/flink-connector-files), (The Apache Software License, Version 2.0)
   org.apache.flink:flink-connector-kafka_2.11:1.13.5 - Flink : Connectors : Kafka (https://github.com/apache/flink/tree/release-1.13.5/flink-connectors/flink-connector-kafka), (The Apache Software License, Version 2.0)
+  org.apache.flink:flink-connector-hbase-2.2_2.11 - Flink : Connectors : HBase 2.2 (https://github.com/apache/flink/tree/release-1.13.5/flink-connectors/flink-connector-hbase-2.2), (The Apache Software License, Version 2.0)
   org.apache.flink:flink-file-sink-common:1.13.5 - Flink : Connectors : File Sink Common (https://github.com/apache/flink/tree/release-1.13.5/flink-connectors/flink-file-sink-common), (The Apache Software License, Version 2.0)
   org.apache.flink:flink-java:1.13.5 - Flink : Java (https://github.com/apache/flink/tree/release-1.13.5/flink-java), (The Apache Software License, Version 2.0)
   org.apache.flink:flink-json:1.13.5 - Flink : Formats : Json (https://github.com/apache/flink/tree/release-1.13.5/flink-formats/flink-json), (The Apache Software License, Version 2.0)
diff --git a/licenses/inlong-sort/NOTICE b/licenses/inlong-sort/NOTICE
index f04523951..2dc1bcab3 100644
--- a/licenses/inlong-sort/NOTICE
+++ b/licenses/inlong-sort/NOTICE
@@ -2086,5 +2086,20 @@ Apache Parquet Avro NOTICE
 
 The NOTICE content is too long  and is included at notices/NOTICE-[project].txt
 
+// ------------------------------------------------------------------
+// NOTICE file corresponding to the section 4d of The Apache License,
+// Version 2.0, in this case for Apache Flink
+// ------------------------------------------------------------------
+
+Apache Flink
+Copyright 2006-2021 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
+Flink : Connectors : HBase 2.2
+Copyright 2014-2021 The Apache Software Foundation
+
 
 
diff --git a/pom.xml b/pom.xml
index c5cf9e101..3657a3e48 100644
--- a/pom.xml
+++ b/pom.xml
@@ -882,6 +882,11 @@
                 <artifactId>flink-connector-hive_${flink.scala.binary.version}</artifactId>
                 <version>${flink.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-connector-hbase-2.2_${flink.scala.binary.version}</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>