You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/11 14:54:42 UTC

[flink] branch master updated: [FLINK-10245][hbase] Add an upsert table sink factory for HBase

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 01031ad  [FLINK-10245][hbase] Add an upsert table sink factory for HBase
01031ad is described below

commit 01031ad2daad4d02006c527b122856118dd21434
Author: Shimin Yang <ya...@youzan.com>
AuthorDate: Tue Aug 28 18:25:33 2018 +0800

    [FLINK-10245][hbase] Add an upsert table sink factory for HBase
    
    This commit adds full support for HBase to be used with Table & SQL API.
    
    It includes:
    - HBase upsert table sink (for append-only and updating queries)
    - HBase table factory
    - HBase table descriptors & validators
    - Unit tests
    
    This closes #9075
---
 .../flink/addons/hbase/HBaseLookupFunction.java    |   6 +-
 .../apache/flink/addons/hbase/HBaseOptions.java    | 135 +++++++++++
 .../flink/addons/hbase/HBaseRowInputFormat.java    |   6 +-
 .../flink/addons/hbase/HBaseTableFactory.java      |  60 ++++-
 .../flink/addons/hbase/HBaseTableSchema.java       |   6 +-
 .../addons/hbase/HBaseUpsertSinkFunction.java      | 254 +++++++++++++++++++++
 .../flink/addons/hbase/HBaseUpsertTableSink.java   | 133 +++++++++++
 .../flink/addons/hbase/HBaseWriteOptions.java      | 151 ++++++++++++
 ...seReadHelper.java => HBaseReadWriteHelper.java} |  67 +++++-
 .../flink/table/descriptors/HBaseValidator.java    |  20 +-
 .../addons/hbase/HBaseLookupFunctionITCase.java    |   8 +-
 .../apache/flink/addons/hbase/HBaseSinkITCase.java | 186 +++++++++++++++
 .../flink/addons/hbase/HBaseTableFactoryTest.java  |  72 ++++--
 .../hbase/HBaseTestingClusterAutostarter.java      |   7 +
 14 files changed, 1068 insertions(+), 43 deletions(-)

diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseLookupFunction.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseLookupFunction.java
index 938d834..4176efc 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseLookupFunction.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseLookupFunction.java
@@ -19,7 +19,7 @@
 package org.apache.flink.addons.hbase;
 
 import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
-import org.apache.flink.addons.hbase.util.HBaseReadHelper;
+import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.functions.FunctionContext;
@@ -53,7 +53,7 @@ public class HBaseLookupFunction extends TableFunction<Row> {
 	private final byte[] serializedConfig;
 	private final HBaseTableSchema hbaseTableSchema;
 
-	private transient HBaseReadHelper readHelper;
+	private transient HBaseReadWriteHelper readHelper;
 	private transient Connection hConnection;
 	private transient HTable table;
 
@@ -115,7 +115,7 @@ public class HBaseLookupFunction extends TableFunction<Row> {
 			LOG.error("Exception while creating connection to HBase.", ioe);
 			throw new RuntimeException("Cannot create connection to HBase.", ioe);
 		}
-		this.readHelper = new HBaseReadHelper(hbaseTableSchema);
+		this.readHelper = new HBaseReadWriteHelper(hbaseTableSchema);
 		LOG.info("end open.");
 	}
 
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseOptions.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseOptions.java
new file mode 100644
index 0000000..1317e96
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseOptions.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Common Options for HBase.
+ */
+public class HBaseOptions {
+
+	private final String tableName;
+	private final String zkQuorum;
+	@Nullable private final String zkNodeParent;
+
+	private HBaseOptions(String tableName, String zkQuorum, @Nullable String zkNodeParent) {
+		this.tableName = tableName;
+		this.zkQuorum = zkQuorum;
+		this.zkNodeParent = zkNodeParent;
+	}
+
+	String getTableName() {
+		return tableName;
+	}
+
+	String getZkQuorum() {
+		return zkQuorum;
+	}
+
+	Optional<String> getZkNodeParent() {
+		return Optional.ofNullable(zkNodeParent);
+	}
+
+	@Override
+	public String toString() {
+		return "HBaseOptions{" +
+			"tableName='" + tableName + '\'' +
+			", zkQuorum='" + zkQuorum + '\'' +
+			", zkNodeParent='" + zkNodeParent + '\'' +
+			'}';
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		HBaseOptions that = (HBaseOptions) o;
+		return Objects.equals(tableName, that.tableName) &&
+			Objects.equals(zkQuorum, that.zkQuorum) &&
+			Objects.equals(zkNodeParent, that.zkNodeParent);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(tableName, zkQuorum, zkNodeParent);
+	}
+
+	/**
+	 * Creates a builder of {@link HBaseOptions}.
+	 */
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	/**
+	 * Builder for {@link HBaseOptions}.
+	 */
+	public static class Builder {
+
+		private String tableName;
+		private String zkQuorum;
+		private String zkNodeParent;
+
+		/**
+		 * Required. Sets the HBase table name.
+		 */
+		public Builder setTableName(String tableName) {
+			checkNotNull(tableName);
+			this.tableName = tableName;
+			return this;
+		}
+
+		/**
+		 * Required. Sets the HBase ZooKeeper quorum configuration.
+		 */
+		public Builder setZkQuorum(String zkQuorum) {
+			checkNotNull(zkQuorum);
+			this.zkQuorum = zkQuorum;
+			return this;
+		}
+
+		/**
+		 * Optional. Sets the root dir in ZK for the HBase cluster. Default is "/hbase".
+		 */
+		public Builder setZkNodeParent(String zkNodeParent) {
+			checkNotNull(zkNodeParent);
+			this.zkNodeParent = zkNodeParent;
+			return this;
+		}
+
+		/**
+		 * Creates an instance of {@link HBaseOptions}.
+		 */
+		public HBaseOptions build() {
+			checkNotNull(zkQuorum, "Zookeeper quorum is not set.");
+			checkNotNull(tableName, "TableName is not set.");
+			return new HBaseOptions(tableName, zkQuorum, zkNodeParent);
+		}
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
index fd2a116..1d4c0ec 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.addons.hbase;
 
-import org.apache.flink.addons.hbase.util.HBaseReadHelper;
+import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -52,7 +52,7 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
 	private final HBaseTableSchema schema;
 
 	private transient org.apache.hadoop.conf.Configuration conf;
-	private transient HBaseReadHelper readHelper;
+	private transient HBaseReadWriteHelper readHelper;
 
 	public HBaseRowInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) {
 		this.tableName = tableName;
@@ -64,7 +64,7 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
 	public void configure(Configuration parameters) {
 		LOG.info("Initializing HBase configuration.");
 		// prepare hbase read helper
-		this.readHelper = new HBaseReadHelper(schema);
+		this.readHelper = new HBaseReadWriteHelper(schema);
 		connectToTable();
 		if (table != null) {
 			scan = getScanner();
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableFactory.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableFactory.java
index 922b26f..26fb973 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableFactory.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableFactory.java
@@ -19,11 +19,14 @@
 package org.apache.flink.addons.hbase;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.HBaseValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
 import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
 
@@ -39,10 +42,14 @@ import java.util.Map;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_TABLE_NAME;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_ZK_QUORUM;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME;
 import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
 import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_NODE_PARENT;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM;
 import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
 import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
@@ -50,21 +57,56 @@ import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
 /**
  * Factory for creating configured instances of {@link HBaseTableSource} or sink.
  */
-public class HBaseTableFactory implements StreamTableSourceFactory<Row> {
+public class HBaseTableFactory implements StreamTableSourceFactory<Row>, StreamTableSinkFactory<Tuple2<Boolean, Row>> {
 
 	@Override
 	public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
 		final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
 		// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
 		Configuration hbaseClientConf = HBaseConfiguration.create();
-		String hbaseZk = properties.get(CONNECTOR_HBASE_ZK_QUORUM);
+		String hbaseZk = descriptorProperties.getString(CONNECTOR_ZK_QUORUM);
 		hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseZk);
-		String hTableName = descriptorProperties.getString(CONNECTOR_HBASE_TABLE_NAME);
+		descriptorProperties
+			.getOptionalString(CONNECTOR_ZK_NODE_PARENT)
+			.ifPresent(v -> hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
+
+		String hTableName = descriptorProperties.getString(CONNECTOR_TABLE_NAME);
 		TableSchema tableSchema = descriptorProperties.getTableSchema(SCHEMA);
 		HBaseTableSchema hbaseSchema = validateTableSchema(tableSchema);
 		return new HBaseTableSource(hbaseClientConf, hTableName, hbaseSchema, null);
 	}
 
+	@Override
+	public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties) {
+		final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
+		HBaseOptions.Builder hbaseOptionsBuilder = HBaseOptions.builder();
+		hbaseOptionsBuilder.setZkQuorum(descriptorProperties.getString(CONNECTOR_ZK_QUORUM));
+		hbaseOptionsBuilder.setTableName(descriptorProperties.getString(CONNECTOR_TABLE_NAME));
+		descriptorProperties
+			.getOptionalString(CONNECTOR_ZK_NODE_PARENT)
+			.ifPresent(hbaseOptionsBuilder::setZkNodeParent);
+
+		TableSchema tableSchema = descriptorProperties.getTableSchema(SCHEMA);
+		HBaseTableSchema hbaseSchema = validateTableSchema(tableSchema);
+
+		HBaseWriteOptions.Builder writeBuilder = HBaseWriteOptions.builder();
+		descriptorProperties
+			.getOptionalInt(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS)
+			.ifPresent(writeBuilder::setBufferFlushMaxRows);
+		descriptorProperties
+			.getOptionalMemorySize(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE)
+			.ifPresent(v -> writeBuilder.setBufferFlushMaxSizeInBytes(v.getBytes()));
+		descriptorProperties
+			.getOptionalDuration(CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL)
+			.ifPresent(v -> writeBuilder.setBufferFlushIntervalMillis(v.toMillis()));
+
+		return new HBaseUpsertTableSink(
+			hbaseSchema,
+			hbaseOptionsBuilder.build(),
+			writeBuilder.build()
+		);
+	}
+
 	private HBaseTableSchema validateTableSchema(TableSchema schema) {
 		HBaseTableSchema hbaseSchema = new HBaseTableSchema();
 		String[] fieldNames = schema.getFieldNames();
@@ -106,8 +148,12 @@ public class HBaseTableFactory implements StreamTableSourceFactory<Row> {
 	public List<String> supportedProperties() {
 		List<String> properties = new ArrayList<>();
 
-		properties.add(CONNECTOR_HBASE_TABLE_NAME);
-		properties.add(CONNECTOR_HBASE_ZK_QUORUM);
+		properties.add(CONNECTOR_TABLE_NAME);
+		properties.add(CONNECTOR_ZK_QUORUM);
+		properties.add(CONNECTOR_ZK_NODE_PARENT);
+		properties.add(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE);
+		properties.add(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS);
+		properties.add(CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL);
 
 		// schema
 		properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
index af642d1..0dd12de 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
@@ -54,7 +54,7 @@ public class HBaseTableSchema implements Serializable {
 	 * @param qualifier the qualifier name
 	 * @param clazz     the data type of the qualifier
 	 */
-	void addColumn(String family, String qualifier, Class<?> clazz) {
+	public void addColumn(String family, String qualifier, Class<?> clazz) {
 		Preconditions.checkNotNull(family, "family name");
 		Preconditions.checkNotNull(qualifier, "qualifier name");
 		Preconditions.checkNotNull(clazz, "class type");
@@ -78,7 +78,7 @@ public class HBaseTableSchema implements Serializable {
 	 * @param rowKeyName the row key field name
 	 * @param clazz the data type of the row key
 	 */
-	void setRowKey(String rowKeyName, Class<?> clazz) {
+	public void setRowKey(String rowKeyName, Class<?> clazz) {
 		Preconditions.checkNotNull(rowKeyName, "row key field name");
 		Preconditions.checkNotNull(clazz, "row key class type");
 		if (!HBaseTypeUtils.isSupportedType(clazz)) {
@@ -97,7 +97,7 @@ public class HBaseTableSchema implements Serializable {
 	 *
 	 * @param charset the charset for value strings and HBase identifiers.
 	 */
-	void setCharset(String charset) {
+	public void setCharset(String charset) {
 		this.charset = charset;
 	}
 
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertSinkFunction.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertSinkFunction.java
new file mode 100644
index 0000000..0c6bfae
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertSinkFunction.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The upsert sink for HBase.
+ *
+ * <p>This class leverage {@link BufferedMutator} to buffer multiple
+ * {@link org.apache.hadoop.hbase.client.Mutation Mutations} before sending the requests to cluster.
+ * The buffering strategy can be configured by {@code bufferFlushMaxSizeInBytes},
+ * {@code bufferFlushMaxMutations} and {@code bufferFlushIntervalMillis}.</p>
+ */
+public class HBaseUpsertSinkFunction
+		extends RichSinkFunction<Tuple2<Boolean, Row>>
+		implements CheckpointedFunction, BufferedMutator.ExceptionListener {
+
+	private static final long serialVersionUID = 1L;
+	private static final Logger LOG = LoggerFactory.getLogger(HBaseUpsertSinkFunction.class);
+
+	private final String hTableName;
+	private final HBaseTableSchema schema;
+	private final byte[] serializedConfig;
+
+	private final long bufferFlushMaxSizeInBytes;
+	private final long bufferFlushMaxMutations;
+	private final long bufferFlushIntervalMillis;
+
+	private transient HBaseReadWriteHelper helper;
+
+	private transient Connection connection;
+	private transient BufferedMutator mutator;
+
+	private transient ScheduledExecutorService executor;
+	private transient ScheduledFuture scheduledFuture;
+	private transient AtomicLong numPendingRequests;
+
+	private transient volatile boolean closed = false;
+
+	/**
+	 * This is set from inside the {@link BufferedMutator.ExceptionListener} if a {@link Throwable}
+	 * was thrown.
+	 *
+	 * <p>Errors will be checked and rethrown before processing each input element, and when the sink is closed.
+	 */
+	private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
+
+	public HBaseUpsertSinkFunction(
+			String hTableName,
+			HBaseTableSchema schema,
+			org.apache.hadoop.conf.Configuration conf,
+			long bufferFlushMaxSizeInBytes,
+			long bufferFlushMaxMutations,
+			long bufferFlushIntervalMillis) {
+		checkArgument(schema.getRowKeyName().isPresent(), "HBaseUpsertSinkFunction requires rowkey is set.");
+		this.hTableName = hTableName;
+		this.schema = schema;
+		// Configuration is not serializable
+		this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);
+		this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
+		this.bufferFlushMaxMutations = bufferFlushMaxMutations;
+		this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		LOG.info("start open ...");
+		org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
+		try {
+			this.helper = new HBaseReadWriteHelper(schema);
+			this.numPendingRequests = new AtomicLong(0);
+
+			if (null == connection) {
+				this.connection = ConnectionFactory.createConnection(config);
+			}
+			// create a parameter instance, set the table name and custom listener reference.
+			BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(hTableName))
+				.listener(this)
+				.writeBufferSize(bufferFlushMaxSizeInBytes);
+			this.mutator = connection.getBufferedMutator(params);
+
+			if (bufferFlushIntervalMillis > 0) {
+				this.executor = Executors.newScheduledThreadPool(
+					1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
+				this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
+					if (closed) {
+						return;
+					}
+					try {
+						flush();
+					} catch (Exception e) {
+						// fail the sink and skip the rest of the items
+						// if the failure handler decides to throw an exception
+						failureThrowable.compareAndSet(null, e);
+					}
+				}, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS);
+			}
+		} catch (TableNotFoundException tnfe) {
+			LOG.error("The table " + hTableName + " not found ", tnfe);
+			throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
+		} catch (IOException ioe) {
+			LOG.error("Exception while creating connection to HBase.", ioe);
+			throw new RuntimeException("Cannot create connection to HBase.", ioe);
+		}
+		LOG.info("end open.");
+	}
+
+	private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException {
+		// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
+		// and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
+		// user params from client-side have the highest priority
+		org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfiguration.create());
+
+		// do validation: check key option(s) in final runtime configuration
+		if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
+			LOG.error("Can not connect to HBase without {} configuration", HConstants.ZOOKEEPER_QUORUM);
+			throw new IOException("Check HBase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
+		}
+
+		return runtimeConfig;
+	}
+
+	private void checkErrorAndRethrow() {
+		Throwable cause = failureThrowable.get();
+		if (cause != null) {
+			throw new RuntimeException("An error occurred in HBaseSink.", cause);
+		}
+	}
+
+	@Override
+	public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
+		checkErrorAndRethrow();
+
+		if (value.f0) {
+			Put put = helper.createPutMutation(value.f1);
+			mutator.mutate(put);
+		} else {
+			Delete delete = helper.createDeleteMutation(value.f1);
+			mutator.mutate(delete);
+		}
+
+		// flush when the buffer number of mutations greater than the configured max size.
+		if (bufferFlushMaxMutations > 0 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
+			flush();
+		}
+	}
+
+	private void flush() throws IOException {
+		// BufferedMutator is thread-safe
+		mutator.flush();
+		numPendingRequests.set(0);
+		checkErrorAndRethrow();
+	}
+
+	@Override
+	public void close() throws Exception {
+		closed = true;
+
+		if (mutator != null) {
+			try {
+				mutator.close();
+			} catch (IOException e) {
+				LOG.warn("Exception occurs while closing HBase BufferedMutator.", e);
+			}
+			this.mutator = null;
+		}
+
+		if (connection != null) {
+			try {
+				connection.close();
+			} catch (IOException e) {
+				LOG.warn("Exception occurs while closing HBase Connection.", e);
+			}
+			this.connection = null;
+		}
+
+		if (scheduledFuture != null) {
+			scheduledFuture.cancel(false);
+			if (executor != null) {
+				executor.shutdownNow();
+			}
+		}
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		while (numPendingRequests.get() != 0) {
+			flush();
+		}
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+		// nothing to do.
+	}
+
+	@Override
+	public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException {
+		// fail the sink and skip the rest of the items
+		// if the failure handler decides to throw an exception
+		failureThrowable.compareAndSet(null, exception);
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertTableSink.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertTableSink.java
new file mode 100644
index 0000000..1dc7f97
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertTableSink.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An upsert {@link UpsertStreamTableSink} for HBase.
+ */
+public class HBaseUpsertTableSink implements UpsertStreamTableSink<Row> {
+
+	private final HBaseTableSchema hbaseTableSchema;
+	private final TableSchema tableSchema;
+	private final HBaseOptions hbaseOptions;
+	private final HBaseWriteOptions writeOptions;
+
+	public HBaseUpsertTableSink(
+			HBaseTableSchema hbaseTableSchema,
+			HBaseOptions hbaseOptions,
+			HBaseWriteOptions writeOptions) {
+		checkArgument(hbaseTableSchema.getRowKeyName().isPresent(), "HBaseUpsertTableSink requires rowkey is set.");
+		this.hbaseTableSchema = hbaseTableSchema;
+		this.tableSchema = hbaseTableSchema.convertsToTableSchema();
+		this.hbaseOptions = hbaseOptions;
+		this.writeOptions = writeOptions;
+	}
+
+	@Override
+	public void setKeyFields(String[] keys) {
+		// hbase always upsert on rowkey, ignore query keys.
+		// Actually, we should verify the query key is the same with rowkey.
+		// However, the query key extraction doesn't work well in some scenarios
+		// (e.g. concat key fields will lose key information). So we skip key validation currently.
+	}
+
+	@Override
+	public void setIsAppendOnly(Boolean isAppendOnly) {
+		// hbase always upsert on rowkey, even works in append only mode.
+	}
+
+	@Override
+	public TypeInformation<Row> getRecordType() {
+		return tableSchema.toRowType();
+	}
+
+	@Override
+	public TableSchema getTableSchema() {
+		return tableSchema;
+	}
+
+	@Override
+	public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
+		Configuration hbaseClientConf = HBaseConfiguration.create();
+		hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseOptions.getZkQuorum());
+		hbaseOptions.getZkNodeParent().ifPresent(v -> hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
+		HBaseUpsertSinkFunction sinkFunction = new HBaseUpsertSinkFunction(
+			hbaseOptions.getTableName(),
+			hbaseTableSchema,
+			hbaseClientConf,
+			writeOptions.getBufferFlushMaxSizeInBytes(),
+			writeOptions.getBufferFlushMaxRows(),
+			writeOptions.getBufferFlushIntervalMillis());
+		return dataStream
+			.addSink(sinkFunction)
+			.setParallelism(dataStream.getParallelism())
+			.name(TableConnectorUtils.generateRuntimeName(this.getClass(), tableSchema.getFieldNames()));
+	}
+
+	@Override
+	public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
+		consumeDataStream(dataStream);
+	}
+
+	@Override
+	public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+		if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {
+			throw new ValidationException("Reconfiguration with different fields is not allowed. " +
+				"Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +
+				"But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));
+		}
+
+		return new HBaseUpsertTableSink(hbaseTableSchema, hbaseOptions, writeOptions);
+	}
+
+	@VisibleForTesting
+	HBaseTableSchema getHBaseTableSchema() {
+		return hbaseTableSchema;
+	}
+
+	@VisibleForTesting
+	HBaseOptions getHBaseOptions() {
+		return hbaseOptions;
+	}
+
+	@VisibleForTesting
+	HBaseWriteOptions getWriteOptions() {
+		return writeOptions;
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseWriteOptions.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseWriteOptions.java
new file mode 100644
index 0000000..17f5d1d
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseWriteOptions.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.hadoop.hbase.client.ConnectionConfiguration;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Options for HBase writing.
+ */
+public class HBaseWriteOptions implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private final long bufferFlushMaxSizeInBytes;
+	private final long bufferFlushMaxRows;
+	private final long bufferFlushIntervalMillis;
+
+	private HBaseWriteOptions(
+			long bufferFlushMaxSizeInBytes,
+			long bufferFlushMaxMutations,
+			long bufferFlushIntervalMillis) {
+		this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
+		this.bufferFlushMaxRows = bufferFlushMaxMutations;
+		this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
+	}
+
+	long getBufferFlushMaxSizeInBytes() {
+		return bufferFlushMaxSizeInBytes;
+	}
+
+	long getBufferFlushMaxRows() {
+		return bufferFlushMaxRows;
+	}
+
+	long getBufferFlushIntervalMillis() {
+		return bufferFlushIntervalMillis;
+	}
+
+	@Override
+	public String toString() {
+		return "HBaseWriteOptions{" +
+			"bufferFlushMaxSizeInBytes=" + bufferFlushMaxSizeInBytes +
+			", bufferFlushMaxRows=" + bufferFlushMaxRows +
+			", bufferFlushIntervalMillis=" + bufferFlushIntervalMillis +
+			'}';
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		HBaseWriteOptions that = (HBaseWriteOptions) o;
+		return bufferFlushMaxSizeInBytes == that.bufferFlushMaxSizeInBytes &&
+			bufferFlushMaxRows == that.bufferFlushMaxRows &&
+			bufferFlushIntervalMillis == that.bufferFlushIntervalMillis;
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(bufferFlushMaxSizeInBytes, bufferFlushMaxRows, bufferFlushIntervalMillis);
+	}
+
+	/**
+	 * Creates a builder for {@link HBaseWriteOptions}.
+	 */
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	/**
+	 * Builder for {@link HBaseWriteOptions}.
+	 */
+	public static class Builder {
+
+		// default is 2mb which is defined in hbase
+		private long bufferFlushMaxSizeInBytes = ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
+		private long bufferFlushMaxRows = -1;
+		private long bufferFlushIntervalMillis = -1;
+
+		/**
+		 * Optional. Sets when to flush a buffered request based on the memory size of rows currently added.
+		 * Default to <code>2mb</code>.
+		 */
+		public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes) {
+			checkArgument(
+				bufferFlushMaxSizeInBytes > 0,
+				"Max byte size of buffered rows must be larger than 0.");
+			this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
+			return this;
+		}
+
+		/**
+		 * Optional. Sets when to flush buffered request based on the number of rows currently added.
+		 * Defaults to not set, i.e. won't flush based on the number of buffered rows.
+		 */
+		public Builder setBufferFlushMaxRows(long bufferFlushMaxRows) {
+			checkArgument(
+				bufferFlushMaxRows > 0,
+				"Max number of buffered rows must be larger than 0.");
+			this.bufferFlushMaxRows = bufferFlushMaxRows;
+			return this;
+		}
+
+		/**
+		 * Optional. Sets a flush interval flushing buffered requesting if the interval passes, in milliseconds.
+		 * Defaults to not set, i.e. won't flush based on flush interval.
+		 */
+		public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis) {
+			checkArgument(
+				bufferFlushIntervalMillis > 0,
+				"Interval (in milliseconds) between each flush must be larger than 0.");
+			this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
+			return this;
+		}
+
+		/**
+		 * Creates a new instance of {@link HBaseWriteOptions}.
+		 */
+		public HBaseWriteOptions build() {
+			return new HBaseWriteOptions(
+				bufferFlushMaxSizeInBytes,
+				bufferFlushMaxRows,
+				bufferFlushIntervalMillis);
+		}
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadHelper.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadWriteHelper.java
similarity index 69%
rename from flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadHelper.java
rename to flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadWriteHelper.java
index 1f8fac2..e4211cd 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadHelper.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadWriteHelper.java
@@ -22,18 +22,20 @@ import org.apache.flink.addons.hbase.HBaseTableSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Row;
 
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 
 import java.nio.charset.Charset;
 
 /**
- * A read helper for HBase. The helper can used to create a {@link Scan} and {@link Get}
- * for scanning or lookuping a HBase table, and supports converting the HBase {@link Result}
- * to Flink {@link Row}.
+ * A read and write helper for HBase. The helper can used to create a {@link Scan} and {@link Get}
+ * for scanning or lookuping a HBase table, and create a {@link Put} and {@link Delete} for writing
+ * to HBase table, and supports converting the HBase {@link Result} to Flink {@link Row}.
  */
-public class HBaseReadHelper {
+public class HBaseReadWriteHelper {
 
 	// family keys
 	private final byte[][] families;
@@ -55,7 +57,7 @@ public class HBaseReadHelper {
 	// nested family rows
 	private Row[] familyRows;
 
-	public HBaseReadHelper(HBaseTableSchema hbaseTableSchema) {
+	public HBaseReadWriteHelper(HBaseTableSchema hbaseTableSchema) {
 		this.families = hbaseTableSchema.getFamilyKeys();
 		this.qualifiers = new byte[this.families.length][][];
 		this.qualifierTypes = new int[this.families.length][];
@@ -163,4 +165,59 @@ public class HBaseReadHelper {
 		}
 		return resultRow;
 	}
+
+	/**
+	 * Returns an instance of Put that writes record to HBase table.
+	 *
+	 * @return The appropriate instance of Put for this use case.
+	 */
+	public Put createPutMutation(Row row) {
+		assert rowKeyIndex != -1;
+		byte[] rowkey = HBaseTypeUtils.serializeFromObject(row.getField(rowKeyIndex), rowKeyType, charset);
+		// upsert
+		Put put = new Put(rowkey);
+		for (int i = 0; i < fieldLength; i++) {
+			if (i != rowKeyIndex) {
+				int f = i > rowKeyIndex ? i - 1 : i;
+				// get family key
+				byte[] familyKey = families[f];
+				Row familyRow = (Row) row.getField(i);
+				for (int q = 0; q < this.qualifiers[f].length; q++) {
+					// get quantifier key
+					byte[] qualifier = qualifiers[f][q];
+					// get quantifier type idx
+					int typeIdx = qualifierTypes[f][q];
+					// read value
+					byte[] value = HBaseTypeUtils.serializeFromObject(familyRow.getField(q), typeIdx, charset);
+					put.addColumn(familyKey, qualifier, value);
+				}
+			}
+		}
+		return put;
+	}
+
+	/**
+	 * Returns an instance of Delete that remove record from HBase table.
+	 *
+	 * @return The appropriate instance of Delete for this use case.
+	 */
+	public Delete createDeleteMutation(Row row) {
+		assert rowKeyIndex != -1;
+		byte[] rowkey = HBaseTypeUtils.serializeFromObject(row.getField(rowKeyIndex), rowKeyType, charset);
+		// delete
+		Delete delete = new Delete(rowkey);
+		for (int i = 0; i < fieldLength; i++) {
+			if (i != rowKeyIndex) {
+				int f = i > rowKeyIndex ? i - 1 : i;
+				// get family key
+				byte[] familyKey = families[f];
+				for (int q = 0; q < this.qualifiers[f].length; q++) {
+					// get quantifier key
+					byte[] qualifier = qualifiers[f][q];
+					delete.addColumn(familyKey, qualifier);
+				}
+			}
+		}
+		return delete;
+	}
 }
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/table/descriptors/HBaseValidator.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/table/descriptors/HBaseValidator.java
index fd844e9..e08909c 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/table/descriptors/HBaseValidator.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/table/descriptors/HBaseValidator.java
@@ -32,18 +32,30 @@ public class HBaseValidator extends ConnectorDescriptorValidator {
 
 	public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
 	public static final String CONNECTOR_VERSION_VALUE_143 = "1.4.3";
-	public static final String CONNECTOR_HBASE_TABLE_NAME = "connector.table-name";
-	public static final String CONNECTOR_HBASE_ZK_QUORUM = "connector.zookeeper.quorum";
+	public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
+	public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum";
+	public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent";
+	public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size";
+	public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows";
+	public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval";
 
 	@Override
 	public void validate(DescriptorProperties properties) {
 		super.validate(properties);
 		properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE, false);
-		properties.validateString(CONNECTOR_HBASE_TABLE_NAME, false, 1);
-		properties.validateString(CONNECTOR_HBASE_ZK_QUORUM, false, 1);
+		properties.validateString(CONNECTOR_TABLE_NAME, false, 1);
+		properties.validateString(CONNECTOR_ZK_QUORUM, false, 1);
+		properties.validateString(CONNECTOR_ZK_NODE_PARENT, true, 1);
+		validateSinkProperties(properties);
 		validateVersion(properties);
 	}
 
+	private void validateSinkProperties(DescriptorProperties properties) {
+		properties.validateMemorySize(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE, true, 1024 * 1024); // only allow MB precision
+		properties.validateInt(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS, true, 1);
+		properties.validateDuration(CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL, true, 1);
+	}
+
 	private void validateVersion(DescriptorProperties properties) {
 		final List<String> versions = Arrays.asList(CONNECTOR_VERSION_VALUE_143);
 		properties.validateEnumValues(CONNECTOR_VERSION, false, versions);
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
index a0822a3..731cf31 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
@@ -51,10 +51,10 @@ import java.util.Map;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_TABLE_NAME;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_ZK_QUORUM;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME;
 import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
 import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM;
 import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 
 /**
@@ -145,10 +145,10 @@ public class HBaseLookupFunctionITCase extends HBaseTestingClusterAutostarter {
 		properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE);
 		properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
 		properties.put(CONNECTOR_PROPERTY_VERSION, "1");
-		properties.put(CONNECTOR_HBASE_TABLE_NAME, HTABLE_NAME);
+		properties.put(CONNECTOR_TABLE_NAME, HTABLE_NAME);
 		// get zk quorum from "hbase-site.xml" in classpath
 		String hbaseZk = HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM);
-		properties.put(CONNECTOR_HBASE_ZK_QUORUM, hbaseZk);
+		properties.put(CONNECTOR_ZK_QUORUM, hbaseZk);
 		// schema
 		String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
 		TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT);
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java
new file mode 100644
index 0000000..ef1445d
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/**
+ * IT case Test for {@link HBaseUpsertTableSink}.
+ */
+public class HBaseSinkITCase extends HBaseTestingClusterAutostarter {
+	private static final long serialVersionUID = 1L;
+
+	private static final String TEST_TABLE = "testTable";
+
+	private static final String FAMILY1 = "family1";
+	private static final String F1COL1 = "col1";
+
+	private static final String FAMILY2 = "family2";
+	private static final String F2COL1 = "col1";
+	private static final String F2COL2 = "col2";
+
+	private static final String FAMILY3 = "family3";
+	private static final String F3COL1 = "col1";
+	private static final String F3COL2 = "col2";
+	private static final String F3COL3 = "col3";
+
+	// prepare a source collection.
+	private static final List<Row> testData1 = new ArrayList<>();
+	private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(
+		new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE, Types.BOOLEAN, Types.STRING},
+		new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3"});
+
+	static {
+		testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
+		testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
+		testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
+		testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4"));
+		testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
+		testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
+		testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
+		testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8"));
+	}
+
+	@BeforeClass
+	public static void activateHBaseCluster() throws IOException {
+		registerHBaseMiniClusterInClasspath();
+		createHBaseTable();
+	}
+
+	private static void createHBaseTable() {
+		// create a table
+		TableName tableName = TableName.valueOf(TEST_TABLE);
+		// column families
+		byte[][] families = new byte[][]{Bytes.toBytes(FAMILY1), Bytes.toBytes(FAMILY2), Bytes.toBytes(FAMILY3)};
+		// split keys
+		byte[][] splitKeys = new byte[][]{Bytes.toBytes(4)};
+		createTable(tableName, families, splitKeys);
+	}
+
+	@Test
+	public void testTableSink() throws Exception {
+		HBaseTableSchema schema = new HBaseTableSchema();
+		schema.addColumn(FAMILY1, F1COL1, Integer.class);
+		schema.addColumn(FAMILY2, F2COL1, String.class);
+		schema.addColumn(FAMILY2, F2COL2, Long.class);
+		schema.setRowKey("rk", Integer.class);
+		schema.addColumn(FAMILY3, F3COL1, Double.class);
+		schema.addColumn(FAMILY3, F3COL2, Boolean.class);
+		schema.addColumn(FAMILY3, F3COL3, String.class);
+
+		Map<String, String> tableProperties = new HashMap<>();
+		tableProperties.put("connector.type", "hbase");
+		tableProperties.put("connector.version", "1.4.3");
+		tableProperties.put("connector.property-version", "1");
+		tableProperties.put("connector.table-name", TEST_TABLE);
+		tableProperties.put("connector.zookeeper.quorum", getZookeeperQuorum());
+		tableProperties.put("connector.zookeeper.znode.parent", "/hbase");
+		DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+		descriptorProperties.putTableSchema(SCHEMA, schema.convertsToTableSchema());
+		descriptorProperties.putProperties(tableProperties);
+		TableSink tableSink = TableFactoryService
+			.find(HBaseTableFactory.class, descriptorProperties.asMap())
+			.createTableSink(descriptorProperties.asMap());
+
+		StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		execEnv.setParallelism(4);
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv);
+		StreamITCase.clear();
+
+		DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1);
+		tEnv.registerDataStream("src", ds);
+		tEnv.registerTableSink("hbase", tableSink);
+
+		String query = "INSERT INTO hbase SELECT ROW(f1c1), ROW(f2c1, f2c2), rowkey, ROW(f3c1, f3c2, f3c3) FROM src";
+		tEnv.sqlUpdate(query);
+
+		// wait to finish
+		tEnv.execute("HBase Job");
+
+		// start a batch scan job to verify contents in HBase table
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+		BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig());
+
+		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+		hbaseTable.setRowKey("rowkey", Integer.class);
+		hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
+		hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
+		hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
+		hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
+		hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
+		hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
+		tableEnv.registerTableSource("hTable", hbaseTable);
+
+		Table result = tableEnv.sqlQuery(
+			"SELECT " +
+				"  h.rowkey, " +
+				"  h.family1.col1, " +
+				"  h.family2.col1, " +
+				"  h.family2.col2, " +
+				"  h.family3.col1, " +
+				"  h.family3.col2, " +
+				"  h.family3.col3 " +
+				"FROM hTable AS h"
+		);
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+
+		String expected =
+				"1,10,Hello-1,100,1.01,false,Welt-1\n" +
+				"2,20,Hello-2,200,2.02,true,Welt-2\n" +
+				"3,30,Hello-3,300,3.03,false,Welt-3\n" +
+				"4,40,,400,4.04,true,Welt-4\n" +
+				"5,50,Hello-5,500,5.05,false,Welt-5\n" +
+				"6,60,Hello-6,600,6.06,true,Welt-6\n" +
+				"7,70,Hello-7,700,7.07,false,Welt-7\n" +
+				"8,80,,800,8.08,true,Welt-8\n";
+
+		TestBaseUtils.compareResultAsText(results, expected);
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTableFactoryTest.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTableFactoryTest.java
index 4061c0e..eba0cee 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTableFactoryTest.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTableFactoryTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.types.Row;
 
@@ -36,12 +37,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_TABLE_NAME;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_ZK_QUORUM;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
 import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 
 /**
@@ -59,21 +54,25 @@ public class HBaseTableFactoryTest {
 	private DescriptorProperties createDescriptor(String[] columnNames, TypeInformation[] columnTypes) {
 		TableSchema tableSchema = new TableSchema(columnNames, columnTypes);
 
-		Map<String, String> tableServiceLookupConf = new HashMap<>();
-		tableServiceLookupConf.put(CONNECTOR_TYPE, "hbase");
-		tableServiceLookupConf.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
-		tableServiceLookupConf.put(CONNECTOR_PROPERTY_VERSION, "1");
-		tableServiceLookupConf.put(CONNECTOR_HBASE_TABLE_NAME, "testHBastTable");
-		tableServiceLookupConf.put(CONNECTOR_HBASE_ZK_QUORUM, "localhost:2181");
+		Map<String, String> tableProperties = new HashMap<>();
+		tableProperties.put("connector.type", "hbase");
+		tableProperties.put("connector.version", "1.4.3");
+		tableProperties.put("connector.property-version", "1");
+		tableProperties.put("connector.table-name", "testHBastTable");
+		tableProperties.put("connector.zookeeper.quorum", "localhost:2181");
+		tableProperties.put("connector.zookeeper.znode.parent", "/flink");
+		tableProperties.put("connector.write.buffer-flush.max-size", "10mb");
+		tableProperties.put("connector.write.buffer-flush.max-rows", "1000");
+		tableProperties.put("connector.write.buffer-flush.interval", "10s");
 
 		DescriptorProperties descriptorProperties = new DescriptorProperties(true);
 		descriptorProperties.putTableSchema(SCHEMA, tableSchema);
-		descriptorProperties.putProperties(tableServiceLookupConf);
+		descriptorProperties.putProperties(tableProperties);
 		return descriptorProperties;
 	}
 
 	@Test
-	public void testConstructorForNestedSchema() {
+	public void testTableSourceFactory() {
 		String[] columnNames = {FAMILY1, FAMILY2, ROWKEY, FAMILY3};
 
 		TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{COL1}, Types.INT);
@@ -102,4 +101,49 @@ public class HBaseTableFactoryTest {
 		Assert.assertArrayEquals(new TypeInformation[]{Types.INT, Types.LONG}, hbaseSchema.getQualifierTypes("f2"));
 		Assert.assertArrayEquals(new TypeInformation[]{Types.DOUBLE, Types.BOOLEAN, Types.STRING}, hbaseSchema.getQualifierTypes("f3"));
 	}
+
+	@Test
+	public void testTableSinkFactory() {
+		String[] columnNames = {ROWKEY, FAMILY1, FAMILY2, FAMILY3};
+		TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{COL1, COL2}, Types.DOUBLE, Types.INT);
+		TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{COL1, COL3}, Types.INT, Types.LONG);
+		TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{COL2, COL3}, Types.BOOLEAN, Types.STRING);
+		TypeInformation[] columnTypes = new TypeInformation[]{Types.STRING, f1, f2, f3};
+		DescriptorProperties descriptorProperties = createDescriptor(columnNames, columnTypes);
+
+		TableSink sink = TableFactoryService
+			.find(HBaseTableFactory.class, descriptorProperties.asMap())
+			.createTableSink(descriptorProperties.asMap());
+
+		Assert.assertTrue(sink instanceof HBaseUpsertTableSink);
+
+		HBaseTableSchema hbaseSchema = ((HBaseUpsertTableSink) sink).getHBaseTableSchema();
+		Assert.assertEquals(0, hbaseSchema.getRowKeyIndex());
+		Assert.assertEquals(Optional.of(Types.STRING), hbaseSchema.getRowKeyTypeInfo());
+
+		Assert.assertArrayEquals(new String[]{"f1", "f2", "f3"}, hbaseSchema.getFamilyNames());
+		Assert.assertArrayEquals(new String[]{"c1", "c2"}, hbaseSchema.getQualifierNames("f1"));
+		Assert.assertArrayEquals(new String[]{"c1", "c3"}, hbaseSchema.getQualifierNames("f2"));
+		Assert.assertArrayEquals(new String[]{"c2", "c3"}, hbaseSchema.getQualifierNames("f3"));
+
+		Assert.assertArrayEquals(new TypeInformation[]{Types.DOUBLE, Types.INT}, hbaseSchema.getQualifierTypes("f1"));
+		Assert.assertArrayEquals(new TypeInformation[]{Types.INT, Types.LONG}, hbaseSchema.getQualifierTypes("f2"));
+		Assert.assertArrayEquals(new TypeInformation[]{Types.BOOLEAN, Types.STRING}, hbaseSchema.getQualifierTypes("f3"));
+
+		HBaseOptions expectedHBaseOptions = HBaseOptions.builder()
+			.setTableName("testHBastTable")
+			.setZkQuorum("localhost:2181")
+			.setZkNodeParent("/flink")
+			.build();
+		HBaseOptions actualHBaseOptions = ((HBaseUpsertTableSink) sink).getHBaseOptions();
+		Assert.assertEquals(expectedHBaseOptions, actualHBaseOptions);
+
+		HBaseWriteOptions expectedWriteOptions = HBaseWriteOptions.builder()
+			.setBufferFlushMaxRows(1000)
+			.setBufferFlushIntervalMillis(10 * 1000)
+			.setBufferFlushMaxSizeInBytes(10 * 1024 * 1024)
+			.build();
+		HBaseWriteOptions actualWriteOptions = ((HBaseUpsertTableSink) sink).getWriteOptions();
+		Assert.assertEquals(expectedWriteOptions, actualWriteOptions);
+	}
 }
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
index 2e61a28..905f80d 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
@@ -157,6 +157,13 @@ public class HBaseTestingClusterAutostarter extends TestLogger implements Serial
 		LOG.info("HBase minicluster: Running");
 	}
 
+	/**
+	 * Returns zookeeper quorum value contains the right port number (varies per run).
+	 */
+	static String getZookeeperQuorum() {
+		return "localhost:" + TEST_UTIL.getZkCluster().getClientPort();
+	}
+
 	private static File hbaseSiteXmlDirectory;
 	private static File hbaseSiteXmlFile;