You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/01/31 23:56:11 UTC
[pulsar] branch master updated: Issue #3290 Pulsar IO connector for
Hbase sink (#3368)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9871200 Issue #3290 Pulsar IO connector for Hbase sink (#3368)
9871200 is described below
commit 98712008cb83c0157fc2c59e6e85eb1542312883
Author: wpl <12...@qq.com>
AuthorDate: Fri Feb 1 07:56:06 2019 +0800
Issue #3290 Pulsar IO connector for Hbase sink (#3368)
Supporting the ability to write data to HBase sink #3290
---
distribution/io/src/assemble/io.xml | 6 +
pom.xml | 1 +
pulsar-io/hbase/pom.xml | 95 ++++++++++
.../pulsar/io/hbase/HbaseAbstractConfig.java | 81 ++++++++
.../pulsar/io/hbase/sink/HbaseAbstractSink.java | 204 +++++++++++++++++++++
.../io/hbase/sink/HbaseGenericRecordSink.java | 89 +++++++++
.../pulsar/io/hbase/sink/HbaseSinkConfig.java | 99 ++++++++++
.../resources/META-INF/services/pulsar-io.yaml | 22 +++
.../org/apache/pulsar/io/hbase/TableUtils.java | 52 ++++++
.../io/hbase/sink/HbaseGenericRecordSinkTest.java | 172 +++++++++++++++++
.../pulsar/io/hbase/sink/HbaseSinkConfigTest.java | 155 ++++++++++++++++
.../hbase/src/test/resources/hbase/hbase-site.xml | 42 +++++
pulsar-io/hbase/src/test/resources/sinkConfig.yaml | 29 +++
pulsar-io/pom.xml | 1 +
site2/docs/io-connectors.md | 1 +
site2/docs/io-hbase.md | 27 +++
16 files changed, 1076 insertions(+)
diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index 10ea149..f688383 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -105,5 +105,11 @@
<outputDirectory>connectors</outputDirectory>
<fileMode>644</fileMode>
</file>
+
+ <file>
+ <source>${basedir}/../../pulsar-io/hbase/target/pulsar-io-hbase-${project.version}.nar</source>
+ <outputDirectory>connectors</outputDirectory>
+ <fileMode>644</fileMode>
+ </file>
</files>
</assembly>
diff --git a/pom.xml b/pom.xml
index 94dee51..5b6f633 100644
--- a/pom.xml
+++ b/pom.xml
@@ -186,6 +186,7 @@ flexible messaging model and an intuitive client API.</description>
<jsonwebtoken.version>0.10.5</jsonwebtoken.version>
<opencensus.version>0.12.3</opencensus.version>
<zstd.version>1.3.7-3</zstd.version>
+ <hbase.version>1.4.9</hbase.version>
<!-- test dependencies -->
<arquillian-cube.version>1.15.1</arquillian-cube.version>
diff --git a/pulsar-io/hbase/pom.xml b/pulsar-io/hbase/pom.xml
new file mode 100644
index 0000000..ca128bd
--- /dev/null
+++ b/pulsar-io/hbase/pom.xml
@@ -0,0 +1,95 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>pulsar-io</artifactId>
+ <groupId>org.apache.pulsar</groupId>
+ <version>2.3.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>pulsar-io-hbase</artifactId>
+ <name>Pulsar IO :: Hbase</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-instance</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-original</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/HbaseAbstractConfig.java b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/HbaseAbstractConfig.java
new file mode 100644
index 0000000..a9f362d
--- /dev/null
+++ b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/HbaseAbstractConfig.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hbase;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.Serializable;
+
+/**
+ * Configuration object for all Hbase Sink components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class HbaseAbstractConfig implements Serializable {
+
+ private static final long serialVersionUID = 6783394446906640112L;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "hbase system configuration 'hbase-site.xml' file")
+ private String hbaseConfigResources;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "hbase system configuration about hbase.zookeeper.quorum value")
+ private String zookeeperQuorum;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "2181",
+ help = "hbase system configuration about hbase.zookeeper.property.clientPort value")
+ private String zookeeperClientPort = "2181";
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "/hbase",
+ help = "hbase system configuration about zookeeper.znode.parent value")
+ private String zookeeperZnodeParent = "/hbase";
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "hbase table, value is namespace:tableName, namespace default value is default")
+ private String tableName;
+
+ public void validate() {
+ Preconditions.checkNotNull(zookeeperQuorum, "zookeeperQuorum property not set.");
+ Preconditions.checkNotNull(zookeeperClientPort, "zookeeperClientPort property not set.");
+ Preconditions.checkNotNull(zookeeperZnodeParent, "zookeeperZnodeParent property not set.");
+ Preconditions.checkNotNull(tableName, "hbase tableName property not set.");
+ }
+}
diff --git a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java
new file mode 100644
index 0000000..17ff802
--- /dev/null
+++ b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hbase.sink;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Simple abstract class for Hbase sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class HbaseAbstractSink<T> implements Sink<T> {
+
+ @Data(staticConstructor = "of")
+ @Setter
+ @Getter
+ @EqualsAndHashCode
+ @ToString
+ public static class TableDefinition {
+ private final String rowKeyName;
+ private final String familyName;
+ private final List<String> qualifierNames;
+ }
+
+ private HbaseSinkConfig hbaseSinkConfig;
+ private Configuration configuration;
+ private Connection connection;
+ private Admin admin;
+ private TableName tableName;
+ private Table table;
+
+ protected TableDefinition tableDefinition;
+
+ // for flush
+ private long batchTimeMs;
+ private int batchSize;
+ private List<Record<T>> incomingList;
+ private ScheduledExecutorService flushExecutor;
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+ hbaseSinkConfig = HbaseSinkConfig.load(config);
+ Preconditions.checkNotNull(hbaseSinkConfig.getZookeeperQuorum(), "zookeeperQuorum property not set.");
+ Preconditions.checkNotNull(hbaseSinkConfig.getZookeeperClientPort(), "zookeeperClientPort property not set.");
+ Preconditions.checkNotNull(hbaseSinkConfig.getZookeeperZnodeParent(), "zookeeperZnodeParent property not set.");
+ Preconditions.checkNotNull(hbaseSinkConfig.getTableName(), "hbase tableName property not set.");
+
+ getTable(hbaseSinkConfig);
+ tableDefinition = getTableDefinition(hbaseSinkConfig);
+
+ batchTimeMs = hbaseSinkConfig.getBatchTimeMs();
+ batchSize = hbaseSinkConfig.getBatchSize();
+ incomingList = Lists.newArrayList();
+ flushExecutor = Executors.newScheduledThreadPool(1);
+ flushExecutor.scheduleAtFixedRate(() -> flush(), batchTimeMs, batchTimeMs, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (null != admin) {
+ admin.close();
+ }
+
+ if (null != connection) {
+ connection.close();
+ }
+
+ if (null != flushExecutor) {
+ flushExecutor.shutdown();
+ }
+ }
+
+ @Override
+ public void write(Record<T> record) throws Exception {
+ int number;
+ synchronized (this) {
+ if (null != record) {
+ incomingList.add(record);
+ }
+ number = incomingList.size();
+ }
+
+ if (number == batchSize) {
+ flushExecutor.submit(() -> flush());
+ }
+ }
+
+ private void flush() {
+ List<Put> puts = new ArrayList<>();
+ List<Record<T>> toFlushList;
+ synchronized (this) {
+ if (incomingList.isEmpty()) {
+ return;
+ }
+ toFlushList = incomingList;
+ incomingList = Lists.newArrayList();
+ }
+
+ if (CollectionUtils.isNotEmpty(toFlushList)) {
+ for (Record<T> record: toFlushList) {
+ try {
+ bindValue(record, puts);
+ } catch (Exception e) {
+ record.fail();
+ toFlushList.remove(record);
+ log.warn("Record flush thread was exception ", e);
+ }
+ }
+ }
+
+ try {
+ if (CollectionUtils.isNotEmpty(puts)) {
+ table.put(puts);
+ admin.flush(tableName);
+ }
+
+ toFlushList.forEach(tRecord -> tRecord.ack());
+ puts.clear();
+ toFlushList.clear();
+ } catch (Exception e) {
+ toFlushList.forEach(tRecord -> tRecord.fail());
+ log.error("Hbase table put data exception ", e);
+ }
+ }
+
+ // bind value with a Hbase put
+ public abstract void bindValue(Record<T> message, List<Put> puts) throws Exception;
+
+ private void getTable(HbaseSinkConfig hbaseSinkConfig) throws IOException {
+ configuration = HBaseConfiguration.create();
+ String hbaseConfigResources = hbaseSinkConfig.getHbaseConfigResources();
+ if (StringUtils.isNotBlank(hbaseConfigResources)) {
+ configuration.addResource(hbaseConfigResources);
+ }
+
+ configuration.set("hbase.zookeeper.quorum", hbaseSinkConfig.getZookeeperQuorum());
+ configuration.set("hbase.zookeeper.property.clientPort", hbaseSinkConfig.getZookeeperClientPort());
+ configuration.set("zookeeper.znode.parent", hbaseSinkConfig.getZookeeperZnodeParent());
+
+ connection = ConnectionFactory.createConnection(configuration);
+ admin = connection.getAdmin();
+ tableName = TableName.valueOf(hbaseSinkConfig.getTableName());
+ if (!admin.tableExists(this.tableName)) {
+ throw new IllegalArgumentException(this.tableName + " table does not exist.");
+ }
+
+ table = connection.getTable(this.tableName);
+ }
+
+ /**
+ * Get the {@link TableDefinition} for the given table.
+ */
+ private TableDefinition getTableDefinition(HbaseSinkConfig hbaseSinkConfig) throws Exception {
+ Preconditions.checkNotNull(hbaseSinkConfig.getRowKeyName(), "rowKeyName property not set.");
+ Preconditions.checkNotNull(hbaseSinkConfig.getFamilyName(), "familyName property not set.");
+ Preconditions.checkNotNull(hbaseSinkConfig.getQualifierNames(), "qualifierNames property not set.");
+
+ return TableDefinition.of(hbaseSinkConfig.getRowKeyName(), hbaseSinkConfig.getFamilyName(), hbaseSinkConfig.getQualifierNames());
+ }
+}
diff --git a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSink.java b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSink.java
new file mode 100644
index 0000000..3027e34
--- /dev/null
+++ b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSink.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hbase.sink;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.DoubleSchema;
+import org.apache.pulsar.client.impl.schema.FloatSchema;
+import org.apache.pulsar.client.impl.schema.IntSchema;
+import org.apache.pulsar.client.impl.schema.LongSchema;
+import org.apache.pulsar.client.impl.schema.ShortSchema;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+
+import java.util.List;
+
+/**
+ * A Simple hbase sink, which interprets input Record in generic record.
+ */
+@Connector(
+ name = "hbase",
+ type = IOType.SINK,
+ help = "The HbaseGenericRecordSink is used for moving messages from Pulsar to Hbase.",
+ configClass = HbaseSinkConfig.class
+)
+@Slf4j
+public class HbaseGenericRecordSink extends HbaseAbstractSink<GenericRecord> {
+ @Override
+ public void bindValue(Record<GenericRecord> message, List<Put> puts) throws Exception {
+ GenericRecord record = message.getValue();
+
+ String rowKeyName = tableDefinition.getRowKeyName();
+ Object rowKeyValue = record.getField(rowKeyName);
+
+ // set familyName value from HbaseSinkConfig
+ String familyName = tableDefinition.getFamilyName();
+ byte[] familyValueBytes = getBytes(familyName);
+
+ List<String> qualifierNames = tableDefinition.getQualifierNames();
+ for (String qualifierName : qualifierNames) {
+ Object qualifierValue = record.getField(qualifierName);
+ if (null != qualifierValue) {
+ Put put = new Put(getBytes(rowKeyValue));
+ put.addColumn(familyValueBytes, getBytes(qualifierName), getBytes(qualifierValue));
+ puts.add(put);
+ }
+ }
+ }
+
+ private byte[] getBytes(Object value) throws Exception{
+ if (value instanceof Integer) {
+ return IntSchema.of().encode((Integer)value);
+ } else if (value instanceof Long) {
+ return LongSchema.of().encode((Long) value);
+ } else if (value instanceof Double) {
+ return DoubleSchema.of().encode((Double) value);
+ } else if (value instanceof Float) {
+ return FloatSchema.of().encode((Float) value);
+ } else if (value instanceof Boolean) {
+ return Bytes.toBytes((Boolean) value);
+ } else if (value instanceof String) {
+ return StringSchema.utf8().encode((String) value);
+ } else if (value instanceof Short) {
+ return ShortSchema.of().encode((Short) value);
+ } else {
+ throw new Exception("Not support value type, need to add it. " + value.getClass());
+ }
+ }
+}
diff --git a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfig.java b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfig.java
new file mode 100644
index 0000000..b2e2534
--- /dev/null
+++ b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfig.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hbase.sink;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+import org.apache.pulsar.io.hbase.HbaseAbstractConfig;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode(callSuper = false)
+@ToString
+@Accessors(chain = true)
+public class HbaseSinkConfig extends HbaseAbstractConfig implements Serializable {
+
+ private static final long serialVersionUID = 1245636479605735555L;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The hbase table rowkey name")
+ private String rowKeyName;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The hbase table column family name")
+ private String familyName;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The hbase table column qualifier names")
+ private List<String> qualifierNames;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "1000l",
+ help = "The hbase operation time in milliseconds")
+ private long batchTimeMs = 1000l;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "200",
+ help = "The batch size of write to the hbase table"
+ )
+ private int batchSize = 200;
+
+ public static HbaseSinkConfig load(String yamlFile) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return mapper.readValue(new File(yamlFile), HbaseSinkConfig.class);
+ }
+
+ public static HbaseSinkConfig load(Map<String, Object> map) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(new ObjectMapper().writeValueAsString(map), HbaseSinkConfig.class);
+ }
+
+ @Override
+ public void validate() {
+ super.validate();
+ Preconditions.checkNotNull(rowKeyName, "rowKeyName property not set.");
+ Preconditions.checkNotNull(familyName, "familyName property not set.");
+ Preconditions.checkNotNull(qualifierNames, "qualifierNames property not set.");
+ Preconditions.checkArgument(batchTimeMs > 0, "batchTimeMs must be a positive long.");
+ Preconditions.checkArgument(batchSize > 0, "batchSize must be a positive integer.");
+ }
+}
diff --git a/pulsar-io/hbase/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/hbase/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..e0082d2
--- /dev/null
+++ b/pulsar-io/hbase/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: hbase
+description: Writes data into hbase table
+sinkClass: org.apache.pulsar.io.hbase.sink.HbaseGenericRecordSink
diff --git a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/TableUtils.java b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/TableUtils.java
new file mode 100644
index 0000000..7c3c78e
--- /dev/null
+++ b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/TableUtils.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * hbase connection Table
+ */
+public class TableUtils {
+
+ public static Table getTable(Map<String, Object> config) throws IOException {
+ Configuration configuration = HBaseConfiguration.create();
+ configuration.set("hbase.zookeeper.quorum", config.get("zookeeperQuorum").toString());
+ configuration.set("hbase.zookeeper.property.clientPort", config.get("zookeeperClientPort").toString());
+ configuration.set("zookeeper.znode.parent", config.get("zookeeperZnodeParent").toString());
+
+ Connection connection = ConnectionFactory.createConnection(configuration);
+ Admin admin = connection.getAdmin();
+ TableName tableName = TableName.valueOf(config.get("tableName").toString());
+ if (!admin.tableExists(tableName)) {
+ throw new IllegalArgumentException(tableName + " table does not exist.");
+ }
+ return connection.getTable(tableName);
+ }
+
+}
diff --git a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
new file mode 100644
index 0000000..2ccb08d
--- /dev/null
+++ b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hbase.sink;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.functions.source.PulsarSourceConfig;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.hbase.TableUtils;
+import org.mockito.Mock;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * hbase Sink test
+ */
+@Slf4j
+public class HbaseGenericRecordSinkTest {
+
+ /**
+ * A Simple class to test hbase class
+ */
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ public static class Foo {
+ private String rowKey;
+ private String name;
+ private String address;
+ private int age;
+ private boolean flag;
+ }
+
+ private String rowKeyName = "rowKey";
+ private String familyName = "info";
+ private String name = "name";
+ private String address = "address";
+ private String age = "age";
+ private String flag = "flag";
+ @Mock
+ protected SinkContext mockSinkContext;
+
+ @Test(enabled = false)
+ public void TestOpenAndWriteSink() throws Exception {
+ Map<String, Object> map = new HashMap<>();
+ map.put("zookeeperQuorum", "localhost");
+ map.put("zookeeperClientPort", "2181");
+ map.put("zookeeperZnodeParent", "/hbase");
+ map.put("tableName", "default:pulsar_hbase");
+ map.put("rowKeyName", rowKeyName);
+ map.put("familyName", familyName);
+
+ List<String> qualifierNames = new ArrayList<>();
+ qualifierNames.add(name);
+ qualifierNames.add(address);
+ qualifierNames.add(age);
+ qualifierNames.add(flag);
+ map.put("qualifierNames",qualifierNames);
+
+ mockSinkContext = mock(SinkContext.class);
+ HbaseGenericRecordSink sink = new HbaseGenericRecordSink();
+
+ // prepare a foo Record
+ Foo obj = new Foo();
+ obj.setRowKey("rowKey_value");
+ obj.setName("name_value");
+ obj.setAddress("address_value");
+ obj.setAge(30);
+ obj.setFlag(true);
+ AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+
+ byte[] bytes = schema.encode(obj);
+ ByteBuf payload = Unpooled.copiedBuffer(bytes);
+ AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+ autoConsumeSchema.setSchema(GenericSchema.of(schema.getSchemaInfo()));
+
+ PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig();
+ Consumer consumer = mock(Consumer.class);
+ Message<GenericRecord> message = new MessageImpl("fake_topic_name", "11:111", map, payload, autoConsumeSchema);
+ Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
+ .message(message)
+ .topicName("fake_topic_name")
+ .ackFunction(() -> {
+ if (pulsarSourceConfig
+ .getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ consumer.acknowledgeCumulativeAsync(message);
+ } else {
+ consumer.acknowledgeAsync(message);
+ }
+ }).failFunction(() -> {
+ if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ throw new RuntimeException("Failed to process message: " + message.getMessageId());
+ }
+ })
+ .build();
+
+ log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+ obj.toString(),
+ message.getValue().toString(),
+ record.getValue().toString());
+
+ // change batchSize to 1, to flush on each write.
+ map.put("batchTimeMs", 1);
+ map.put("batchSize", 1);
+ // open should success
+ sink.open(map,mockSinkContext);
+
+ // write should success.
+ sink.write(record);
+ log.info("executed write");
+ // sleep to wait backend flush complete
+ Thread.sleep(500);
+
+ // value has been written to hbase table, read it out and verify.
+ Table table = TableUtils.getTable(map);
+ Get scan = new Get(Bytes.toBytes(obj.getRowKey()));
+ Result result = table.get(scan);
+ byte[] byteName = result.getValue(Bytes.toBytes(familyName), Bytes.toBytes(name));
+ byte[] byteAddress = result.getValue(Bytes.toBytes(familyName), Bytes.toBytes(address));
+ byte[] byteAge = result.getValue(Bytes.toBytes(familyName), Bytes.toBytes(age));
+ byte[] byteFlag = result.getValue(Bytes.toBytes(familyName), Bytes.toBytes(flag));
+ Assert.assertEquals(obj.getName(), Bytes.toString(byteName));
+ Assert.assertEquals(obj.getAddress(), Bytes.toString(byteAddress));
+ Assert.assertEquals(obj.getAge(), Bytes.toInt(byteAge));
+ Assert.assertEquals(obj.isFlag(), Bytes.toBoolean(byteFlag));
+
+ table.close();
+ sink.close();
+ }
+
+}
diff --git a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfigTest.java b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfigTest.java
new file mode 100644
index 0000000..8312bad
--- /dev/null
+++ b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfigTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hbase.sink;
+
+import com.fasterxml.jackson.databind.exc.MismatchedInputException;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * HbaseSinkConfig test
+ */
+public class HbaseSinkConfigTest {
+
+ @Test
+ public final void loadFromYamlFileTest() throws IOException {
+ File yamlFile = getFile("sinkConfig.yaml");
+ String path = yamlFile.getAbsolutePath();
+ HbaseSinkConfig config = HbaseSinkConfig.load(path);
+ assertNotNull(config);
+ assertEquals("hbase-site.xml", config.getHbaseConfigResources());
+ assertEquals("localhost", config.getZookeeperQuorum());
+ assertEquals("2181", config.getZookeeperClientPort());
+ assertEquals("/hbase", config.getZookeeperZnodeParent());
+ assertEquals("pulsar_hbase", config.getTableName());
+ assertEquals("rowKey", config.getRowKeyName());
+ assertEquals("info", config.getFamilyName());
+
+ List<String> qualifierNames = new ArrayList<>();
+ qualifierNames.add("name");
+ qualifierNames.add("address");
+ qualifierNames.add("age");
+ assertEquals(qualifierNames, config.getQualifierNames());
+ }
+
+ @Test
+ public final void loadFromMapTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object>();
+ map.put("hbaseConfigResources", "hbase-site.xml");
+ map.put("zookeeperQuorum", "localhost");
+ map.put("zookeeperClientPort", "2181");
+ map.put("zookeeperZnodeParent", "/hbase");
+ map.put("tableName", "pulsar_hbase");
+ map.put("rowKeyName", "rowKey");
+ map.put("familyName", "info");
+
+ HbaseSinkConfig config = HbaseSinkConfig.load(map);
+ assertNotNull(config);
+ assertEquals("hbase-site.xml", config.getHbaseConfigResources());
+ assertEquals("localhost", config.getZookeeperQuorum());
+ assertEquals("2181", config.getZookeeperClientPort());
+ assertEquals("/hbase", config.getZookeeperZnodeParent());
+ assertEquals("pulsar_hbase", config.getTableName());
+ assertEquals("rowKey", config.getRowKeyName());
+ assertEquals("info", config.getFamilyName());
+
+ }
+
+ @Test
+ public final void validValidateTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object>();
+ map.put("zookeeperQuorum", "localhost");
+ map.put("zookeeperClientPort", "2181");
+ map.put("zookeeperZnodeParent", "/hbase");
+ map.put("tableName", "pulsar_hbase");
+ map.put("rowKeyName", "rowKey");
+ map.put("familyName", "info");
+ List<String> qualifierNames = new ArrayList<>();
+ qualifierNames.add("qualifierName");
+ map.put("qualifierNames", qualifierNames);
+
+ HbaseSinkConfig config = HbaseSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = NullPointerException.class,
+ expectedExceptionsMessageRegExp = "hbase tableName property not set.")
+ public final void missingValidValidateTableNameTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object>();
+ map.put("zookeeperQuorum", "localhost");
+ map.put("zookeeperClientPort", "2181");
+ map.put("zookeeperZnodeParent", "/hbase");
+ map.put("rowKeyName", "rowKey");
+ map.put("familyName", "info");
+ List<String> qualifierNames = new ArrayList<>();
+ qualifierNames.add("qualifierName");
+ map.put("qualifierNames", qualifierNames);
+
+ HbaseSinkConfig config = HbaseSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = MismatchedInputException.class)
+ public final void invalidListValidateTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object>();
+ map.put("zookeeperQuorum", "localhost");
+ map.put("zookeeperClientPort", "2181");
+ map.put("zookeeperZnodeParent", "/hbase");
+ map.put("tableName", "pulsar_hbase");
+ map.put("rowKeyName", "rowKey");
+ map.put("familyName", "info");
+ map.put("qualifierNames", new ArrayList<>().add("name"));
+
+ HbaseSinkConfig config = HbaseSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "batchTimeMs must be a positive long.")
+ public final void invalidBatchTimeMsTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object>();
+ map.put("zookeeperQuorum", "localhost");
+ map.put("zookeeperClientPort", "2181");
+ map.put("zookeeperZnodeParent", "/hbase");
+ map.put("tableName", "pulsar_hbase");
+ map.put("rowKeyName", "rowKey");
+ map.put("familyName", "info");
+ List<String> qualifierNames = new ArrayList<>();
+ qualifierNames.add("qualifierName");
+ map.put("qualifierNames", qualifierNames);
+ map.put("batchTimeMs",-10);
+
+ HbaseSinkConfig config = HbaseSinkConfig.load(map);
+ config.validate();
+ }
+
+ private File getFile(String name) {
+ ClassLoader classLoader = getClass().getClassLoader();
+ return new File(classLoader.getResource(name).getFile());
+ }
+}
diff --git a/pulsar-io/hbase/src/test/resources/hbase/hbase-site.xml b/pulsar-io/hbase/src/test/resources/hbase/hbase-site.xml
new file mode 100644
index 0000000..c2b520a
--- /dev/null
+++ b/pulsar-io/hbase/src/test/resources/hbase/hbase-site.xml
@@ -0,0 +1,42 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <property >
+ <name>hbase.cluster.distributed</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hbase.rootdir</name>
+ <value>hdfs://localhost:8020/hbase</value>
+ </property>
+ <property>
+ <name>hbase.zookeeper.quorum</name>
+ <value>localhost</value>
+ </property>
+ <property>
+ <name>hbase.zookeeper.property.clientPort</name>
+ <value>2181</value>
+ </property>
+ <property>
+ <name>zookeeper.znode.parent</name>
+ <value>/hbase</value>
+ </property>
+</configuration>
diff --git a/pulsar-io/hbase/src/test/resources/sinkConfig.yaml b/pulsar-io/hbase/src/test/resources/sinkConfig.yaml
new file mode 100644
index 0000000..bafd4b4
--- /dev/null
+++ b/pulsar-io/hbase/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"hbaseConfigResources": "hbase-site.xml",
+"zookeeperQuorum": "localhost",
+"zookeeperClientPort": "2181",
+"zookeeperZnodeParent": "/hbase",
+"tableName": "pulsar_hbase",
+"rowKeyName": "rowKey",
+"familyName": "info",
+"qualifierNames": [ 'name', 'address', 'age']
+}
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index ea433ce..fb88341 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -50,6 +50,7 @@
<module>canal</module>
<module>file</module>
<module>netty</module>
+ <module>hbase</module>
</modules>
</project>
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index c81ee98..909e554 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -18,3 +18,4 @@ Pulsar Functions cluster.
- [Twitter Firehose Source Connector](io-twitter.md)
- [CDC Source Connector based on Debezium](io-cdc.md)
- [Netty Source Connector](io-netty.md#source)
+- [Hbase Sink Connector](io-hbase.md#sink)
diff --git a/site2/docs/io-hbase.md b/site2/docs/io-hbase.md
new file mode 100644
index 0000000..c3dc452
--- /dev/null
+++ b/site2/docs/io-hbase.md
@@ -0,0 +1,27 @@
+---
+id: io-hbase
+title: hbase Connector
+sidebar_label: hbase Connector
+---
+
+## Sink
+
+The hbase Sink Connector is used to pull messages from Pulsar topics and persist the messages
+to a hbase table.
+
+## Sink Configuration Options
+
+All the Hbase sink settings are listed as below. All the settings are required to run a Hbase sink.
+
+| Name | Default | Required | Description |
+|------|---------|----------|-------------|
+| `hbaseConfigResources` | `null` | `false` | hbase system configuration 'hbase-site.xml' file. |
+| `zookeeperQuorum` | `null` | `true` | hbase system configuration about hbase.zookeeper.quorum value. |
+| `zookeeperClientPort` | `2181` | `false` | hbase system configuration about hbase.zookeeper.property.clientPort value. |
+| `zookeeperZnodeParent` | `/hbase` | `false` | hbase system configuration about zookeeper.znode.parent value. |
+| `tableName` | `null` | `true` | hbase table, value is namespace:tableName, namespace default value is default. |
+| `rowKeyName` | `null` | `true` | hbase table rowkey name. |
+| `familyName` | `null` | `true` | hbase table column family name. |
+| `qualifierNames` | `null` | `true` | hbase table column qualifier names. |
+| `timeoutMs` | `1000l` | `false` | hbase table operation timeout in milliseconds. |
+| `batchSize` | `200` | `false` | The batch size of updates made to the hbase table. |