You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/05/11 12:23:30 UTC
[incubator-inlong] branch master updated: [INLONG-4156][Sort] Add HBase load node (#4158)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fc162105a [INLONG-4156][Sort] Add HBase load node (#4158)
fc162105a is described below
commit fc162105a6dfcd8dd27d2d41ee6347bf72f5b747
Author: pacino <ge...@gmail.com>
AuthorDate: Wed May 11 20:23:24 2022 +0800
[INLONG-4156][Sort] Add HBase load node (#4158)
---
.../sort/protocol/constant/HBaseConstant.java | 42 +++++++
.../apache/inlong/sort/protocol/node/LoadNode.java | 4 +-
.../org/apache/inlong/sort/protocol/node/Node.java | 10 +-
.../sort/protocol/node/load/HbaseLoadNode.java | 128 +++++++++++++++++++++
.../sort/protocol/node/load/HbaseLoadNodeTest.java | 38 ++++++
inlong-sort/sort-single-tenant/pom.xml | 5 +
.../flink/parser/impl/FlinkSqlParser.java | 82 ++++++++++++-
.../flink/parser/HbaseLoadFlinkSqlParseTest.java | 122 ++++++++++++++++++++
licenses/inlong-sort/LICENSE | 1 +
licenses/inlong-sort/NOTICE | 15 +++
pom.xml | 5 +
11 files changed, 442 insertions(+), 10 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/HBaseConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/HBaseConstant.java
new file mode 100644
index 000000000..a16b9187d
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/HBaseConstant.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.constant;
+
+/**
+ * hbase option constant
+ */
+public class HBaseConstant {
+
+ public static final String CONNECTOR = "connector";
+
+ public static final String HBASE_2 = "hbase-2.2";
+
+ public static final String TABLE_NAME = "table-name";
+
+ public static final String ZOOKEEPER_QUORUM = "zookeeper.quorum";
+
+ public static final String ZOOKEEPER_ZNODE_PARENT = "zookeeper.znode.parent";
+
+ public static final String SINK_BUFFER_FLUSH_MAX_SIZE = "sink.buffer-flush.max-size";
+
+ public static final String SINK_BUFFER_FLUSH_MAX_ROWS = "sink.buffer-flush.max-rows";
+
+ public static final String SINK_BUFFER_FLUSH_INTERVAL = "sink.buffer-flush.interval";
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index a87c5b7a4..89217573c 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -27,6 +27,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
@@ -43,7 +44,8 @@ import java.util.Map;
property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"),
- @JsonSubTypes.Type(value = HiveLoadNode.class, name = "hiveLoad")
+ @JsonSubTypes.Type(value = HiveLoadNode.class, name = "hiveLoad"),
+ @JsonSubTypes.Type(value = HbaseLoadNode.class, name = "hbaseLoad")
})
@NoArgsConstructor
@Data
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index 210994ba8..4873f7725 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -17,9 +17,6 @@
package org.apache.inlong.sort.protocol.node;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
@@ -32,6 +29,10 @@ import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
import org.apache.inlong.sort.protocol.node.transform.TransformNode;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
@@ -42,7 +43,8 @@ import org.apache.inlong.sort.protocol.node.transform.TransformNode;
@JsonSubTypes.Type(value = TransformNode.class, name = "baseTransform"),
@JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"),
@JsonSubTypes.Type(value = DistinctNode.class, name = "distinct"),
- @JsonSubTypes.Type(value = HiveLoadNode.class, name = "hiveLoad")
+ @JsonSubTypes.Type(value = HiveLoadNode.class, name = "hiveLoad"),
+ @JsonSubTypes.Type(value = HiveLoadNode.class, name = "hbaseLoad")
})
public interface Node {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java
new file mode 100644
index 000000000..3841454d9
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.HBaseConstant;
+import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hbase load node for generate hbase connector DDL
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("hbaseLoad")
+@Data
+@NoArgsConstructor
+public class HbaseLoadNode extends LoadNode implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @JsonProperty("tableName")
+ private String tableName;
+
+ @JsonProperty("nameSpace")
+ private String nameSpace;
+
+ @JsonProperty("zookeeperQuorum")
+ private String zookeeperQuorum;
+
+ @JsonProperty("rowKey")
+ private String rowKey;
+
+ @JsonProperty("sinkBufferFlushMaxSize")
+ private String sinkBufferFlushMaxSize;
+
+ @JsonProperty("zookeeperZnodeParent")
+ private String zookeeperZnodeParent;
+
+ @JsonProperty("sinkBufferFlushMaxRows")
+ private String sinkBufferFlushMaxRows;
+
+ @JsonProperty("sinkBufferFlushInterval")
+ private String sinkBufferFlushInterval;
+
+ @JsonCreator
+ public HbaseLoadNode(@JsonProperty("id") String id,
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+ @JsonProperty("filters") List<FilterFunction> filters,
+ @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+ @JsonProperty("sinkParallelism") Integer sinkParallelism,
+ @JsonProperty("properties") Map<String, String> properties,
+ @JsonProperty("tableName") String tableName,
+ @JsonProperty("nameSpace") String nameSpace,
+ @JsonProperty("zookeeperQuorum") String zookeeperQuorum,
+ @JsonProperty("rowKey") String rowKey,
+ @JsonProperty("sinkBufferFlushMaxSize") String sinkBufferFlushMaxSize,
+ @JsonProperty("zookeeperZnodeParent") String zookeeperZnodeParent,
+ @JsonProperty("sinkBufferFlushMaxRows") String sinkBufferFlushMaxRows,
+ @JsonProperty("sinkBufferFlushInterval") String sinkBufferFlushInterval) {
+ super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+ this.tableName = Preconditions.checkNotNull(tableName, "tableName of hbase is null");
+ this.nameSpace = Preconditions.checkNotNull(nameSpace, "nameSpace of hbase is null");
+ this.zookeeperQuorum = Preconditions.checkNotNull(zookeeperQuorum, "zookeeperQuorum of hbase is null");
+ this.rowKey = Preconditions.checkNotNull(rowKey, "rowKey of hbase is null");
+ this.sinkBufferFlushMaxSize = sinkBufferFlushMaxSize;
+ this.zookeeperZnodeParent = zookeeperZnodeParent;
+ this.sinkBufferFlushMaxRows = sinkBufferFlushMaxRows;
+ this.sinkBufferFlushInterval = sinkBufferFlushInterval;
+ }
+
+ @Override
+ public Map<String, String> tableOptions() {
+ Map<String, String> map = super.tableOptions();
+ map.put(HBaseConstant.CONNECTOR, HBaseConstant.HBASE_2);
+ map.put(HBaseConstant.TABLE_NAME, nameSpace + ":" + tableName);
+ map.put(HBaseConstant.ZOOKEEPER_QUORUM, zookeeperQuorum);
+ if (StringUtils.isNotEmpty(sinkBufferFlushInterval)) {
+ map.put(HBaseConstant.SINK_BUFFER_FLUSH_INTERVAL, sinkBufferFlushInterval);
+ }
+ if (StringUtils.isNotEmpty(zookeeperZnodeParent)) {
+ map.put(HBaseConstant.ZOOKEEPER_ZNODE_PARENT, zookeeperZnodeParent);
+ }
+ if (StringUtils.isNotEmpty(sinkBufferFlushMaxRows)) {
+ map.put(HBaseConstant.SINK_BUFFER_FLUSH_MAX_ROWS, sinkBufferFlushMaxRows);
+ }
+ if (StringUtils.isNotEmpty(sinkBufferFlushMaxSize)) {
+ map.put(HBaseConstant.SINK_BUFFER_FLUSH_MAX_SIZE, sinkBufferFlushMaxSize);
+ }
+ return map;
+ }
+
+ @Override
+ public String genTableName() {
+ return this.tableName;
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
new file mode 100644
index 000000000..483d26757
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+
+import java.util.Arrays;
+
+public class HbaseLoadNodeTest extends SerializeBaseTest<HbaseLoadNode> {
+
+ @Override
+ public HbaseLoadNode getTestObject() {
+ return new HbaseLoadNode("2", "test_hbase",
+ Arrays.asList(new FieldInfo("cf:id", new StringFormatInfo())),
+ Arrays.asList(new FieldRelationShip(new FieldInfo("id", new StringFormatInfo()),
+ new FieldInfo("cf:id", new StringFormatInfo()))), null, null, 1, null, "mytable", "default",
+ "localhost:2181", "MD5(`id`)", null, null, null, null);
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index d9a7a4af7..13fb179a4 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -282,6 +282,11 @@
<version>${debezium-core.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-hbase-2.2_${flink.scala.binary.version}</artifactId>
+ </dependency>
+
</dependencies>
<build>
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
index 07a7822ae..2a15caaec 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
@@ -32,6 +32,7 @@ import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
import org.apache.inlong.sort.protocol.node.transform.TransformNode;
@@ -493,16 +494,40 @@ public class FlinkSqlParser implements Parser {
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO `").append(loadNode.genTableName()).append("` ");
sb.append("\n SELECT ");
- Map<String, FieldRelationShip> fieldRelationMap = new HashMap<>(loadNode.getFieldRelationShips().size());
- loadNode.getFieldRelationShips().forEach(s -> {
- fieldRelationMap.put(s.getOutputField().getName(), s);
- });
- parseFieldRelations(loadNode.getFields(), fieldRelationMap, sb);
+ if (loadNode instanceof HbaseLoadNode) {
+ parseHbaseLoadFieldRelation((HbaseLoadNode) loadNode, sb);
+ } else {
+ Map<String, FieldRelationShip> fieldRelationMap = new HashMap<>(loadNode.getFieldRelationShips().size());
+ loadNode.getFieldRelationShips().forEach(s -> {
+ fieldRelationMap.put(s.getOutputField().getName(), s);
+ });
+ parseFieldRelations(loadNode.getFields(), fieldRelationMap, sb);
+ }
sb.append("\n FROM `").append(inputNode.genTableName()).append("`");
parseFilterFields(loadNode.getFilterStrategy(), loadNode.getFilters(), sb);
return sb.toString();
}
+ private void parseHbaseLoadFieldRelation(HbaseLoadNode hbaseLoadNode, StringBuilder sb) {
+ sb.append(hbaseLoadNode.getRowKey()).append(" as rowkey,\n");
+ List<FieldRelationShip> fieldRelationShips = hbaseLoadNode.getFieldRelationShips();
+ Map<String, List<FieldRelationShip>> columnFamilyMapFields = genColumnFamilyMapFieldRelationShips(
+ fieldRelationShips);
+ for (Map.Entry<String, List<FieldRelationShip>> entry : columnFamilyMapFields.entrySet()) {
+ StringBuilder fieldAppend = new StringBuilder(" ROW(");
+ for (FieldRelationShip fieldRelationShip : entry.getValue()) {
+ FieldInfo fieldInfo = (FieldInfo) fieldRelationShip.getInputField();
+ fieldAppend.append(fieldInfo.getName()).append(",");
+ }
+ if (fieldAppend.length() > 0) {
+ fieldAppend.delete(fieldAppend.lastIndexOf(","), fieldAppend.length());
+ }
+ fieldAppend.append("),");
+ sb.append(fieldAppend);
+ }
+ sb.delete(sb.lastIndexOf(","), sb.length());
+ }
+
/**
* Generate create sql
*
@@ -513,6 +538,9 @@ public class FlinkSqlParser implements Parser {
if (node instanceof TransformNode) {
return genCreateTransformSql(node);
}
+ if (node instanceof HbaseLoadNode) {
+ return genCreateHbaseLoadSql((HbaseLoadNode) node);
+ }
StringBuilder sb = new StringBuilder("CREATE TABLE `");
sb.append(node.genTableName()).append("`(\n");
sb.append(genPrimaryKey(node.getPrimaryKey()));
@@ -532,6 +560,50 @@ public class FlinkSqlParser implements Parser {
return sb.toString();
}
+ /**
+ * gen create table DDL for hbase load
+ *
+ * @param node
+ * @return
+ */
+ private String genCreateHbaseLoadSql(HbaseLoadNode node) {
+ StringBuilder sb = new StringBuilder("CREATE TABLE `");
+ sb.append(node.genTableName()).append("`(\n");
+ sb.append("rowkey STRING,\n");
+ List<FieldRelationShip> fieldRelationShips = node.getFieldRelationShips();
+ Map<String, List<FieldRelationShip>> columnFamilyMapFields = genColumnFamilyMapFieldRelationShips(
+ fieldRelationShips);
+ for (Map.Entry<String, List<FieldRelationShip>> entry : columnFamilyMapFields.entrySet()) {
+ sb.append(entry.getKey());
+ StringBuilder fieldsAppend = new StringBuilder(" Row<");
+ for (FieldRelationShip fieldRelationShip : entry.getValue()) {
+ FieldInfo fieldInfo = fieldRelationShip.getOutputField();
+ fieldsAppend.append(fieldInfo.getName().split(":")[1]).append(" ")
+ .append(TableFormatUtils.deriveLogicalType(fieldInfo.getFormatInfo()).asSummaryString())
+ .append(",");
+ }
+ if (fieldsAppend.length() > 0) {
+ fieldsAppend.delete(fieldsAppend.lastIndexOf(","), fieldsAppend.length());
+ fieldsAppend.append(">,\n");
+ }
+ sb.append(fieldsAppend);
+ }
+ sb.append("PRIMARY KEY (rowkey) NOT ENFORCED\n) ");
+ sb.append(parseOptions(node.tableOptions()));
+ return sb.toString();
+ }
+
+ private Map<String, List<FieldRelationShip>> genColumnFamilyMapFieldRelationShips(
+ List<FieldRelationShip> fieldRelationShips) {
+ Map<String, List<FieldRelationShip>> columnFamilyMapFields = new HashMap<>(16);
+ for (FieldRelationShip fieldRelationShip : fieldRelationShips) {
+ String columnFamily = fieldRelationShip.getOutputField().getName().split(":")[0];
+ columnFamilyMapFields.computeIfAbsent(columnFamily, v -> new ArrayList<>())
+ .add(fieldRelationShip);
+ }
+ return columnFamilyMapFields;
+ }
+
/**
* Genrate create transform sql
*
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/HbaseLoadFlinkSqlParseTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/HbaseLoadFlinkSqlParseTest.java
new file mode 100644
index 000000000..b2c2281e7
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/HbaseLoadFlinkSqlParseTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.singletenant.flink.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.singletenant.flink.parser.result.FlinkSqlParseResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Test for {@link HbaseLoadNode}
+ */
+public class HbaseLoadFlinkSqlParseTest extends AbstractTestBase {
+
+ /**
+ * build mysql extract node
+ * @return Mysql extract node
+ */
+ private MySqlExtractNode buildMySQLExtractNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("age", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()));
+ Map<String, String> map = new HashMap<>();
+ return new MySqlExtractNode("1", "mysql_input", fields,
+ null, map, null,
+ Collections.singletonList("user"), "localhost", "root", "inlong",
+ "test", null, null,
+ false, null);
+ }
+
+ /**
+ * build hbase load node
+ * @return hbase load node
+ */
+ private HbaseLoadNode buildHbaseLoadNode() {
+ return new HbaseLoadNode("2", "test_hbase",
+ Arrays.asList(new FieldInfo("cf:age", new LongFormatInfo()), new FieldInfo("cf:name",
+ new StringFormatInfo())),
+ Arrays.asList(new FieldRelationShip(new FieldInfo("age", new LongFormatInfo()),
+ new FieldInfo("cf:age", new LongFormatInfo())),
+ new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("cf:name", new StringFormatInfo()))), null, null, 1, null, "mytable",
+ "default",
+ "localhost:2181", "MD5(`name`)", null, null, null, null);
+ }
+
+ /**
+ * build node relation
+ * @param inputs extract node
+ * @param outputs load node
+ * @return node relation
+ */
+ private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelationShip(inputIds, outputIds);
+ }
+
+ /**
+ * Test flink sql task for extract is mysql {@link MySqlExtractNode} and load is hbase {@link HbaseLoadNode}
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testFlinkSqlParse() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ Node inputNode = buildMySQLExtractNode();
+ Node outputNode = buildHbaseLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
+ Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+ FlinkSqlParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+
+}
diff --git a/licenses/inlong-sort/LICENSE b/licenses/inlong-sort/LICENSE
index da4f61f3f..d314db5f0 100644
--- a/licenses/inlong-sort/LICENSE
+++ b/licenses/inlong-sort/LICENSE
@@ -423,6 +423,7 @@ The text of each license is the standard Apache 2.0 license.
org.apache.flink:flink-connector-base:1.13.5 - Flink : Connectors : Base (https://github.com/apache/flink/tree/release-1.13.5/flink-connectors/flink-connector-base), (The Apache Software License, Version 2.0)
org.apache.flink:flink-connector-files:1.13.5 - Flink : Connectors : Files (https://github.com/apache/flink/tree/release-1.13.5/flink-connectors/flink-connector-files), (The Apache Software License, Version 2.0)
org.apache.flink:flink-connector-kafka_2.11:1.13.5 - Flink : Connectors : Kafka (https://github.com/apache/flink/tree/release-1.13.5/flink-connectors/flink-connector-kafka), (The Apache Software License, Version 2.0)
+ org.apache.flink:flink-connector-hbase-2.2_2.11 - Flink : Connectors : HBase 2.2 (https://github.com/apache/flink/tree/release-1.13.5/flink-connectors/flink-connector-hbase-2.2), (The Apache Software License, Version 2.0)
org.apache.flink:flink-file-sink-common:1.13.5 - Flink : Connectors : File Sink Common (https://github.com/apache/flink/tree/release-1.13.5/flink-connectors/flink-file-sink-common), (The Apache Software License, Version 2.0)
org.apache.flink:flink-java:1.13.5 - Flink : Java (https://github.com/apache/flink/tree/release-1.13.5/flink-java), (The Apache Software License, Version 2.0)
org.apache.flink:flink-json:1.13.5 - Flink : Formats : Json (https://github.com/apache/flink/tree/release-1.13.5/flink-formats/flink-json), (The Apache Software License, Version 2.0)
diff --git a/licenses/inlong-sort/NOTICE b/licenses/inlong-sort/NOTICE
index f04523951..2dc1bcab3 100644
--- a/licenses/inlong-sort/NOTICE
+++ b/licenses/inlong-sort/NOTICE
@@ -2086,5 +2086,20 @@ Apache Parquet Avro NOTICE
The NOTICE content is too long and is included at notices/NOTICE-[project].txt
+// ------------------------------------------------------------------
+// NOTICE file corresponding to the section 4d of The Apache License,
+// Version 2.0, in this case for Apache Flink
+// ------------------------------------------------------------------
+
+Apache Flink
+Copyright 2006-2021 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
+Flink : Connectors : HBase 2.2
+Copyright 2014-2021 The Apache Software Foundation
+
diff --git a/pom.xml b/pom.xml
index c5cf9e101..3657a3e48 100644
--- a/pom.xml
+++ b/pom.xml
@@ -882,6 +882,11 @@
<artifactId>flink-connector-hive_${flink.scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-hbase-2.2_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>