You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/01/31 23:56:11 UTC

[pulsar] branch master updated: Issue #3290 Pulsar IO connector for Hbase sink (#3368)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9871200  Issue #3290 Pulsar IO connector for Hbase sink (#3368)
9871200 is described below

commit 98712008cb83c0157fc2c59e6e85eb1542312883
Author: wpl <12...@qq.com>
AuthorDate: Fri Feb 1 07:56:06 2019 +0800

    Issue #3290 Pulsar IO connector for Hbase sink (#3368)
    
    Supporting the ability to write data to HBase sink #3290
---
 distribution/io/src/assemble/io.xml                |   6 +
 pom.xml                                            |   1 +
 pulsar-io/hbase/pom.xml                            |  95 ++++++++++
 .../pulsar/io/hbase/HbaseAbstractConfig.java       |  81 ++++++++
 .../pulsar/io/hbase/sink/HbaseAbstractSink.java    | 204 +++++++++++++++++++++
 .../io/hbase/sink/HbaseGenericRecordSink.java      |  89 +++++++++
 .../pulsar/io/hbase/sink/HbaseSinkConfig.java      |  99 ++++++++++
 .../resources/META-INF/services/pulsar-io.yaml     |  22 +++
 .../org/apache/pulsar/io/hbase/TableUtils.java     |  52 ++++++
 .../io/hbase/sink/HbaseGenericRecordSinkTest.java  | 172 +++++++++++++++++
 .../pulsar/io/hbase/sink/HbaseSinkConfigTest.java  | 155 ++++++++++++++++
 .../hbase/src/test/resources/hbase/hbase-site.xml  |  42 +++++
 pulsar-io/hbase/src/test/resources/sinkConfig.yaml |  29 +++
 pulsar-io/pom.xml                                  |   1 +
 site2/docs/io-connectors.md                        |   1 +
 site2/docs/io-hbase.md                             |  27 +++
 16 files changed, 1076 insertions(+)

diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index 10ea149..f688383 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -105,5 +105,11 @@
       <outputDirectory>connectors</outputDirectory>
       <fileMode>644</fileMode>
     </file>
+
+    <file>
+      <source>${basedir}/../../pulsar-io/hbase/target/pulsar-io-hbase-${project.version}.nar</source>
+      <outputDirectory>connectors</outputDirectory>
+      <fileMode>644</fileMode>
+    </file>
   </files>
 </assembly>
diff --git a/pom.xml b/pom.xml
index 94dee51..5b6f633 100644
--- a/pom.xml
+++ b/pom.xml
@@ -186,6 +186,7 @@ flexible messaging model and an intuitive client API.</description>
     <jsonwebtoken.version>0.10.5</jsonwebtoken.version>
     <opencensus.version>0.12.3</opencensus.version>
     <zstd.version>1.3.7-3</zstd.version>
+    <hbase.version>1.4.9</hbase.version>
 
     <!-- test dependencies -->
     <arquillian-cube.version>1.15.1</arquillian-cube.version>
diff --git a/pulsar-io/hbase/pom.xml b/pulsar-io/hbase/pom.xml
new file mode 100644
index 0000000..ca128bd
--- /dev/null
+++ b/pulsar-io/hbase/pom.xml
@@ -0,0 +1,95 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>pulsar-io</artifactId>
+        <groupId>org.apache.pulsar</groupId>
+        <version>2.3.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>pulsar-io-hbase</artifactId>
+    <name>Pulsar IO :: Hbase</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-io-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-functions-instance</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-client-original</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-yaml</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${hbase.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <version>${hbase.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <version>${hbase.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-nar-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/HbaseAbstractConfig.java b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/HbaseAbstractConfig.java
new file mode 100644
index 0000000..a9f362d
--- /dev/null
+++ b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/HbaseAbstractConfig.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hbase;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.Serializable;
+
+/**
+ * Configuration object for all Hbase Sink components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class HbaseAbstractConfig implements Serializable {
+
+    private static final long serialVersionUID = 6783394446906640112L;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "hbase system configuration 'hbase-site.xml' file")
+    private String hbaseConfigResources;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "hbase system configuration about hbase.zookeeper.quorum value")
+    private String zookeeperQuorum;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "2181",
+        help = "hbase system configuration about hbase.zookeeper.property.clientPort value")
+    private String zookeeperClientPort = "2181";
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "/hbase",
+        help = "hbase system configuration about zookeeper.znode.parent value")
+    private String zookeeperZnodeParent = "/hbase";
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "hbase table, value is namespace:tableName, namespace default value is default")
+    private String tableName;
+
+    public void validate() {
+        Preconditions.checkNotNull(zookeeperQuorum, "zookeeperQuorum property not set.");
+        Preconditions.checkNotNull(zookeeperClientPort, "zookeeperClientPort property not set.");
+        Preconditions.checkNotNull(zookeeperZnodeParent, "zookeeperZnodeParent property not set.");
+        Preconditions.checkNotNull(tableName, "hbase tableName property not set.");
+    }
+}
diff --git a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java
new file mode 100644
index 0000000..17ff802
--- /dev/null
+++ b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hbase.sink;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Simple abstract class for Hbase sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class HbaseAbstractSink<T> implements Sink<T> {
+
+    @Data(staticConstructor = "of")
+    @Setter
+    @Getter
+    @EqualsAndHashCode
+    @ToString
+    public static class TableDefinition {
+        private final String rowKeyName;
+        private final String familyName;
+        private final List<String> qualifierNames;
+    }
+
+    private HbaseSinkConfig hbaseSinkConfig;
+    private Configuration configuration;
+    private Connection connection;
+    private Admin admin;
+    private TableName tableName;
+    private Table table;
+
+    protected TableDefinition tableDefinition;
+
+    // for flush
+    private long batchTimeMs;
+    private int batchSize;
+    private List<Record<T>> incomingList;
+    private ScheduledExecutorService flushExecutor;
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+        hbaseSinkConfig = HbaseSinkConfig.load(config);
+        Preconditions.checkNotNull(hbaseSinkConfig.getZookeeperQuorum(), "zookeeperQuorum property not set.");
+        Preconditions.checkNotNull(hbaseSinkConfig.getZookeeperClientPort(), "zookeeperClientPort property not set.");
+        Preconditions.checkNotNull(hbaseSinkConfig.getZookeeperZnodeParent(), "zookeeperZnodeParent property not set.");
+        Preconditions.checkNotNull(hbaseSinkConfig.getTableName(), "hbase tableName property not set.");
+
+        getTable(hbaseSinkConfig);
+        tableDefinition = getTableDefinition(hbaseSinkConfig);
+
+        batchTimeMs = hbaseSinkConfig.getBatchTimeMs();
+        batchSize = hbaseSinkConfig.getBatchSize();
+        incomingList = Lists.newArrayList();
+        flushExecutor = Executors.newScheduledThreadPool(1);
+        flushExecutor.scheduleAtFixedRate(() -> flush(), batchTimeMs, batchTimeMs, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (null != admin) {
+            admin.close();
+        }
+
+        if (null != connection) {
+            connection.close();
+        }
+
+        if (null != flushExecutor) {
+            flushExecutor.shutdown();
+        }
+    }
+
+    @Override
+    public void write(Record<T> record) throws Exception {
+        int number;
+        synchronized (this) {
+            if (null != record) {
+                incomingList.add(record);
+            }
+            number = incomingList.size();
+        }
+
+        if (number == batchSize) {
+            flushExecutor.submit(() -> flush());
+        }
+    }
+
+    private void flush() {
+        List<Put> puts = new ArrayList<>();
+        List<Record<T>>  toFlushList;
+        synchronized (this) {
+            if (incomingList.isEmpty()) {
+                return;
+            }
+            toFlushList = incomingList;
+            incomingList = Lists.newArrayList();
+        }
+
+        if (CollectionUtils.isNotEmpty(toFlushList)) {
+            for (Record<T> record: toFlushList) {
+                try {
+                    bindValue(record, puts);
+                } catch (Exception e) {
+                    record.fail();
+                    toFlushList.remove(record);
+                    log.warn("Record flush thread was exception ", e);
+                }
+            }
+        }
+
+        try {
+            if (CollectionUtils.isNotEmpty(puts)) {
+                table.put(puts);
+                admin.flush(tableName);
+            }
+
+            toFlushList.forEach(tRecord -> tRecord.ack());
+            puts.clear();
+            toFlushList.clear();
+        } catch (Exception e) {
+            toFlushList.forEach(tRecord -> tRecord.fail());
+            log.error("Hbase table put data exception ", e);
+        }
+    }
+
+    // bind value with a Hbase put
+    public abstract void bindValue(Record<T> message, List<Put> puts) throws Exception;
+
+    private void getTable(HbaseSinkConfig hbaseSinkConfig) throws IOException {
+        configuration = HBaseConfiguration.create();
+        String hbaseConfigResources = hbaseSinkConfig.getHbaseConfigResources();
+        if (StringUtils.isNotBlank(hbaseConfigResources)) {
+            configuration.addResource(hbaseConfigResources);
+        }
+
+        configuration.set("hbase.zookeeper.quorum", hbaseSinkConfig.getZookeeperQuorum());
+        configuration.set("hbase.zookeeper.property.clientPort", hbaseSinkConfig.getZookeeperClientPort());
+        configuration.set("zookeeper.znode.parent", hbaseSinkConfig.getZookeeperZnodeParent());
+
+        connection = ConnectionFactory.createConnection(configuration);
+        admin = connection.getAdmin();
+        tableName = TableName.valueOf(hbaseSinkConfig.getTableName());
+        if (!admin.tableExists(this.tableName)) {
+            throw new IllegalArgumentException(this.tableName + " table does not exist.");
+        }
+
+        table = connection.getTable(this.tableName);
+    }
+
+    /**
+     * Get the {@link TableDefinition} for the given table.
+     */
+    private TableDefinition getTableDefinition(HbaseSinkConfig hbaseSinkConfig) throws Exception {
+        Preconditions.checkNotNull(hbaseSinkConfig.getRowKeyName(), "rowKeyName property not set.");
+        Preconditions.checkNotNull(hbaseSinkConfig.getFamilyName(), "familyName property not set.");
+        Preconditions.checkNotNull(hbaseSinkConfig.getQualifierNames(), "qualifierNames property not set.");
+
+        return TableDefinition.of(hbaseSinkConfig.getRowKeyName(), hbaseSinkConfig.getFamilyName(), hbaseSinkConfig.getQualifierNames());
+    }
+}
diff --git a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSink.java b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSink.java
new file mode 100644
index 0000000..3027e34
--- /dev/null
+++ b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSink.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hbase.sink;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.DoubleSchema;
+import org.apache.pulsar.client.impl.schema.FloatSchema;
+import org.apache.pulsar.client.impl.schema.IntSchema;
+import org.apache.pulsar.client.impl.schema.LongSchema;
+import org.apache.pulsar.client.impl.schema.ShortSchema;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+
+import java.util.List;
+
+/**
+ * A Simple hbase sink, which interprets input Record in generic record.
+ */
+@Connector(
+    name = "hbase",
+    type = IOType.SINK,
+    help = "The HbaseGenericRecordSink is used for moving messages from Pulsar to Hbase.",
+    configClass = HbaseSinkConfig.class
+)
+@Slf4j
+public class HbaseGenericRecordSink extends HbaseAbstractSink<GenericRecord> {
+    @Override
+    public void bindValue(Record<GenericRecord> message, List<Put> puts) throws Exception {
+        GenericRecord record = message.getValue();
+
+        String rowKeyName = tableDefinition.getRowKeyName();
+        Object rowKeyValue = record.getField(rowKeyName);
+
+        // set familyName value from HbaseSinkConfig
+        String familyName = tableDefinition.getFamilyName();
+        byte[] familyValueBytes = getBytes(familyName);
+
+        List<String> qualifierNames = tableDefinition.getQualifierNames();
+        for (String qualifierName : qualifierNames) {
+            Object qualifierValue = record.getField(qualifierName);
+            if (null != qualifierValue) {
+                Put put = new Put(getBytes(rowKeyValue));
+                put.addColumn(familyValueBytes, getBytes(qualifierName), getBytes(qualifierValue));
+                puts.add(put);
+            }
+        }
+    }
+
+    private byte[] getBytes(Object value) throws Exception{
+        if (value instanceof Integer) {
+            return IntSchema.of().encode((Integer)value);
+        } else if (value instanceof Long) {
+            return LongSchema.of().encode((Long) value);
+        } else if (value instanceof Double) {
+            return DoubleSchema.of().encode((Double) value);
+        } else if (value instanceof Float) {
+            return FloatSchema.of().encode((Float) value);
+        } else if (value instanceof Boolean) {
+            return Bytes.toBytes((Boolean) value);
+        } else if (value instanceof String) {
+            return StringSchema.utf8().encode((String) value);
+        } else if (value instanceof Short) {
+            return ShortSchema.of().encode((Short) value);
+        } else {
+            throw new Exception("Not support value type, need to add it. " + value.getClass());
+        }
+    }
+}
diff --git a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfig.java b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfig.java
new file mode 100644
index 0000000..b2e2534
--- /dev/null
+++ b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfig.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hbase.sink;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+import org.apache.pulsar.io.hbase.HbaseAbstractConfig;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode(callSuper = false)
+@ToString
+@Accessors(chain = true)
+public class HbaseSinkConfig extends HbaseAbstractConfig implements Serializable {
+
+    private static final long serialVersionUID = 1245636479605735555L;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The hbase table rowkey name")
+    private String rowKeyName;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The hbase table column family name")
+    private String familyName;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The hbase table column qualifier names")
+    private List<String> qualifierNames;
+
+    @FieldDoc(
+       required = false,
+       defaultValue = "1000l",
+       help = "The hbase operation time in milliseconds")
+    private long batchTimeMs = 1000l;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "200",
+        help = "The batch size of write to the hbase table"
+    )
+    private int batchSize = 200;
+
+    public static HbaseSinkConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), HbaseSinkConfig.class);
+    }
+
+    public static HbaseSinkConfig load(Map<String, Object> map) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), HbaseSinkConfig.class);
+    }
+
+    @Override
+    public void validate() {
+        super.validate();
+        Preconditions.checkNotNull(rowKeyName, "rowKeyName property not set.");
+        Preconditions.checkNotNull(familyName, "familyName property not set.");
+        Preconditions.checkNotNull(qualifierNames, "qualifierNames property not set.");
+        Preconditions.checkArgument(batchTimeMs > 0, "batchTimeMs must be a positive long.");
+        Preconditions.checkArgument(batchSize > 0, "batchSize must be a positive integer.");
+    }
+}
diff --git a/pulsar-io/hbase/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/hbase/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..e0082d2
--- /dev/null
+++ b/pulsar-io/hbase/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: hbase
+description: Writes data into hbase table
+sinkClass: org.apache.pulsar.io.hbase.sink.HbaseGenericRecordSink
diff --git a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/TableUtils.java b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/TableUtils.java
new file mode 100644
index 0000000..7c3c78e
--- /dev/null
+++ b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/TableUtils.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * hbase connection Table
+ */
+public class TableUtils {
+
+    public static Table getTable(Map<String, Object> config) throws IOException {
+        Configuration configuration = HBaseConfiguration.create();
+        configuration.set("hbase.zookeeper.quorum", config.get("zookeeperQuorum").toString());
+        configuration.set("hbase.zookeeper.property.clientPort", config.get("zookeeperClientPort").toString());
+        configuration.set("zookeeper.znode.parent", config.get("zookeeperZnodeParent").toString());
+
+        Connection connection = ConnectionFactory.createConnection(configuration);
+        Admin admin = connection.getAdmin();
+        TableName tableName = TableName.valueOf(config.get("tableName").toString());
+        if (!admin.tableExists(tableName)) {
+            throw new IllegalArgumentException(tableName + " table does not exist.");
+        }
+        return connection.getTable(tableName);
+    }
+
+}
diff --git a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
new file mode 100644
index 0000000..2ccb08d
--- /dev/null
+++ b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hbase.sink;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.functions.source.PulsarSourceConfig;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.hbase.TableUtils;
+import org.mockito.Mock;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * hbase Sink test
+ */
+@Slf4j
+public class HbaseGenericRecordSinkTest {
+
+    /**
+     * A Simple class to test hbase class
+     */
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Foo {
+        private String rowKey;
+        private String name;
+        private String address;
+        private int age;
+        private boolean flag;
+    }
+
+    private String rowKeyName = "rowKey";
+    private String familyName = "info";
+    private String name = "name";
+    private String address = "address";
+    private String age = "age";
+    private String flag = "flag";
+    @Mock
+    protected SinkContext mockSinkContext;
+
+    @Test(enabled = false)
+    public void TestOpenAndWriteSink() throws Exception {
+        Map<String, Object> map = new HashMap<>();
+        map.put("zookeeperQuorum", "localhost");
+        map.put("zookeeperClientPort", "2181");
+        map.put("zookeeperZnodeParent", "/hbase");
+        map.put("tableName", "default:pulsar_hbase");
+        map.put("rowKeyName", rowKeyName);
+        map.put("familyName", familyName);
+
+        List<String> qualifierNames = new ArrayList<>();
+        qualifierNames.add(name);
+        qualifierNames.add(address);
+        qualifierNames.add(age);
+        qualifierNames.add(flag);
+        map.put("qualifierNames",qualifierNames);
+
+        mockSinkContext = mock(SinkContext.class);
+        HbaseGenericRecordSink sink = new HbaseGenericRecordSink();
+
+        // prepare a foo Record
+        Foo obj = new Foo();
+        obj.setRowKey("rowKey_value");
+        obj.setName("name_value");
+        obj.setAddress("address_value");
+        obj.setAge(30);
+        obj.setFlag(true);
+        AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+
+        byte[] bytes = schema.encode(obj);
+        ByteBuf payload = Unpooled.copiedBuffer(bytes);
+        AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+        autoConsumeSchema.setSchema(GenericSchema.of(schema.getSchemaInfo()));
+
+        PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig();
+        Consumer consumer = mock(Consumer.class);
+        Message<GenericRecord> message = new MessageImpl("fake_topic_name", "11:111", map, payload, autoConsumeSchema);
+        Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
+            .message(message)
+            .topicName("fake_topic_name")
+            .ackFunction(() -> {
+                if (pulsarSourceConfig
+                        .getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                    consumer.acknowledgeCumulativeAsync(message);
+                } else {
+                    consumer.acknowledgeAsync(message);
+                }
+            }).failFunction(() -> {
+                if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                    throw new RuntimeException("Failed to process message: " + message.getMessageId());
+                }
+            })
+            .build();
+
+        log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+                obj.toString(),
+                message.getValue().toString(),
+                record.getValue().toString());
+
+        // change batchSize to 1, to flush on each write.
+        map.put("batchTimeMs", 1);
+        map.put("batchSize", 1);
+        // open should success
+        sink.open(map,mockSinkContext);
+
+        // write should success.
+        sink.write(record);
+        log.info("executed write");
+        // sleep to wait backend flush complete
+        Thread.sleep(500);
+
+        // value has been written to hbase table, read it out and verify.
+        Table table = TableUtils.getTable(map);
+        Get scan = new Get(Bytes.toBytes(obj.getRowKey()));
+        Result result = table.get(scan);
+        byte[] byteName = result.getValue(Bytes.toBytes(familyName), Bytes.toBytes(name));
+        byte[] byteAddress = result.getValue(Bytes.toBytes(familyName), Bytes.toBytes(address));
+        byte[] byteAge = result.getValue(Bytes.toBytes(familyName), Bytes.toBytes(age));
+        byte[] byteFlag = result.getValue(Bytes.toBytes(familyName), Bytes.toBytes(flag));
+        Assert.assertEquals(obj.getName(), Bytes.toString(byteName));
+        Assert.assertEquals(obj.getAddress(), Bytes.toString(byteAddress));
+        Assert.assertEquals(obj.getAge(), Bytes.toInt(byteAge));
+        Assert.assertEquals(obj.isFlag(), Bytes.toBoolean(byteFlag));
+
+        table.close();
+        sink.close();
+    }
+
+}
diff --git a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfigTest.java b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfigTest.java
new file mode 100644
index 0000000..8312bad
--- /dev/null
+++ b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfigTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hbase.sink;
+
+import com.fasterxml.jackson.databind.exc.MismatchedInputException;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * HbaseSinkConfig test
+ */
+public class HbaseSinkConfigTest {
+
+    @Test
+    public final void loadFromYamlFileTest() throws IOException {
+        File yamlFile = getFile("sinkConfig.yaml");
+        String path = yamlFile.getAbsolutePath();
+        HbaseSinkConfig config = HbaseSinkConfig.load(path);
+        assertNotNull(config);
+        assertEquals("hbase-site.xml", config.getHbaseConfigResources());
+        assertEquals("localhost", config.getZookeeperQuorum());
+        assertEquals("2181", config.getZookeeperClientPort());
+        assertEquals("/hbase", config.getZookeeperZnodeParent());
+        assertEquals("pulsar_hbase", config.getTableName());
+        assertEquals("rowKey", config.getRowKeyName());
+        assertEquals("info", config.getFamilyName());
+
+        List<String> qualifierNames = new ArrayList<>();
+        qualifierNames.add("name");
+        qualifierNames.add("address");
+        qualifierNames.add("age");
+        assertEquals(qualifierNames, config.getQualifierNames());
+    }
+
+    @Test
+    public final void loadFromMapTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object>();
+        map.put("hbaseConfigResources", "hbase-site.xml");
+        map.put("zookeeperQuorum", "localhost");
+        map.put("zookeeperClientPort", "2181");
+        map.put("zookeeperZnodeParent", "/hbase");
+        map.put("tableName", "pulsar_hbase");
+        map.put("rowKeyName", "rowKey");
+        map.put("familyName", "info");
+
+        HbaseSinkConfig config = HbaseSinkConfig.load(map);
+        assertNotNull(config);
+        assertEquals("hbase-site.xml", config.getHbaseConfigResources());
+        assertEquals("localhost", config.getZookeeperQuorum());
+        assertEquals("2181", config.getZookeeperClientPort());
+        assertEquals("/hbase", config.getZookeeperZnodeParent());
+        assertEquals("pulsar_hbase", config.getTableName());
+        assertEquals("rowKey", config.getRowKeyName());
+        assertEquals("info", config.getFamilyName());
+
+    }
+
+    @Test
+    public final void validValidateTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object>();
+        map.put("zookeeperQuorum", "localhost");
+        map.put("zookeeperClientPort", "2181");
+        map.put("zookeeperZnodeParent", "/hbase");
+        map.put("tableName", "pulsar_hbase");
+        map.put("rowKeyName", "rowKey");
+        map.put("familyName", "info");
+        List<String> qualifierNames = new ArrayList<>();
+        qualifierNames.add("qualifierName");
+        map.put("qualifierNames", qualifierNames);
+
+        HbaseSinkConfig config = HbaseSinkConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = NullPointerException.class,
+            expectedExceptionsMessageRegExp = "hbase tableName property not set.")
+    public final void missingValidValidateTableNameTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object>();
+        map.put("zookeeperQuorum", "localhost");
+        map.put("zookeeperClientPort", "2181");
+        map.put("zookeeperZnodeParent", "/hbase");
+        map.put("rowKeyName", "rowKey");
+        map.put("familyName", "info");
+        List<String> qualifierNames = new ArrayList<>();
+        qualifierNames.add("qualifierName");
+        map.put("qualifierNames", qualifierNames);
+
+        HbaseSinkConfig config = HbaseSinkConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = MismatchedInputException.class)
+    public final void invalidListValidateTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object>();
+        map.put("zookeeperQuorum", "localhost");
+        map.put("zookeeperClientPort", "2181");
+        map.put("zookeeperZnodeParent", "/hbase");
+        map.put("tableName", "pulsar_hbase");
+        map.put("rowKeyName", "rowKey");
+        map.put("familyName", "info");
+        map.put("qualifierNames", new ArrayList<>().add("name"));
+
+        HbaseSinkConfig config = HbaseSinkConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "batchTimeMs must be a positive long.")
+    public final void invalidBatchTimeMsTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object>();
+        map.put("zookeeperQuorum", "localhost");
+        map.put("zookeeperClientPort", "2181");
+        map.put("zookeeperZnodeParent", "/hbase");
+        map.put("tableName", "pulsar_hbase");
+        map.put("rowKeyName", "rowKey");
+        map.put("familyName", "info");
+        List<String> qualifierNames = new ArrayList<>();
+        qualifierNames.add("qualifierName");
+        map.put("qualifierNames", qualifierNames);
+        map.put("batchTimeMs",-10);
+
+        HbaseSinkConfig config = HbaseSinkConfig.load(map);
+        config.validate();
+    }
+
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+}
diff --git a/pulsar-io/hbase/src/test/resources/hbase/hbase-site.xml b/pulsar-io/hbase/src/test/resources/hbase/hbase-site.xml
new file mode 100644
index 0000000..c2b520a
--- /dev/null
+++ b/pulsar-io/hbase/src/test/resources/hbase/hbase-site.xml
@@ -0,0 +1,42 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+    <property >
+        <name>hbase.cluster.distributed</name>
+        <value>true</value>
+    </property>
+    <property>
+        <name>hbase.rootdir</name>
+        <value>hdfs://localhost:8020/hbase</value>
+    </property>
+    <property>
+        <name>hbase.zookeeper.quorum</name>
+        <value>localhost</value>
+    </property>
+    <property>
+        <name>hbase.zookeeper.property.clientPort</name>
+        <value>2181</value>
+    </property>
+    <property>
+        <name>zookeeper.znode.parent</name>
+        <value>/hbase</value>
+    </property>
+</configuration>
diff --git a/pulsar-io/hbase/src/test/resources/sinkConfig.yaml b/pulsar-io/hbase/src/test/resources/sinkConfig.yaml
new file mode 100644
index 0000000..bafd4b4
--- /dev/null
+++ b/pulsar-io/hbase/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"hbaseConfigResources": "hbase-site.xml",
+"zookeeperQuorum": "localhost",
+"zookeeperClientPort": "2181",
+"zookeeperZnodeParent": "/hbase",
+"tableName": "pulsar_hbase",
+"rowKeyName": "rowKey",
+"familyName": "info",
+"qualifierNames": [ 'name', 'address', 'age']
+}
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index ea433ce..fb88341 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -50,6 +50,7 @@
     <module>canal</module>
     <module>file</module>
     <module>netty</module>
+    <module>hbase</module>
   </modules>
 
 </project>
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index c81ee98..909e554 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -18,3 +18,4 @@ Pulsar Functions cluster.
 - [Twitter Firehose Source Connector](io-twitter.md)
 - [CDC Source Connector based on Debezium](io-cdc.md)
 - [Netty Source Connector](io-netty.md#source)
+- [Hbase Sink Connector](io-hbase.md#sink)
diff --git a/site2/docs/io-hbase.md b/site2/docs/io-hbase.md
new file mode 100644
index 0000000..c3dc452
--- /dev/null
+++ b/site2/docs/io-hbase.md
@@ -0,0 +1,27 @@
+---
+id: io-hbase
+title: hbase Connector
+sidebar_label: hbase Connector
+---
+
+## Sink
+
+The hbase Sink Connector is used to pull messages from Pulsar topics and persist the messages
+to a hbase table.
+
+## Sink Configuration Options
+
+All the Hbase sink settings are listed as below. All the settings are required to run a Hbase sink.
+
+| Name | Default | Required | Description |
+|------|---------|----------|-------------|
+| `hbaseConfigResources` | `null` | `false` | hbase system configuration 'hbase-site.xml' file. |
+| `zookeeperQuorum` | `null` | `true` | hbase system configuration about hbase.zookeeper.quorum value. |
+| `zookeeperClientPort` | `2181` | `false` | hbase system configuration about hbase.zookeeper.property.clientPort value. |
+| `zookeeperZnodeParent` | `/hbase` | `false` | hbase system configuration about zookeeper.znode.parent value. |
+| `tableName` | `null` | `true` | hbase table, value is namespace:tableName, namespace default value is default. |
+| `rowKeyName` | `null` | `true` | hbase table rowkey name. |
+| `familyName` | `null` | `true` | hbase table column family name. |
+| `qualifierNames` | `null` | `true` | hbase table column qualifier names. |
+| `timeoutMs` | `1000l` | `false` | hbase table operation timeout in milliseconds. |
+| `batchSize` | `200` | `false` | The batch size of updates made to the hbase table. |