You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/29 12:34:19 UTC

[GitHub] Clarkkkkk closed pull request #6627: [FLINK-10245] [Streaming Connector] Add Pojo, Tuple, Row and Scala Pr…

Clarkkkkk closed pull request #6627: [FLINK-10245] [Streaming Connector] Add Pojo, Tuple, Row and Scala Pr…
URL: https://github.com/apache/flink/pull/6627
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBasePojoSink.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBasePojoSink.java
new file mode 100644
index 00000000000..e2c3b488701
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBasePojoSink.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+
+import java.lang.reflect.Field;
+
+/**
+ * Flink Sink to save data into a HBase cluster.
+ * @param <IN> Type of the element emitted by this sink
+ */
+public class HBasePojoSink<IN> extends HBaseSinkBase<IN> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final HBaseTableMapper tableMapper;
+	private final String[] fieldNameList;
+	private final TypeInformation<IN> typeInfo;
+	private transient Field[] fields;
+	private transient Field rowKeyField;
+
+	/**
+	 * The main constructor for creating HBasePojoSink.
+	 *
+	 * @param builder A builder for build HBase connection and handle for communicating with a single HBase table.
+	 * @param tableMapper The mapping from a Pojo to a HBase table
+	 * @param typeInfo TypeInformation of the Pojo
+	 */
+	public HBasePojoSink(HBaseTableBuilder builder, HBaseTableMapper tableMapper, TypeInformation<IN> typeInfo) {
+		super(builder);
+		this.tableMapper = tableMapper;
+		this.fieldNameList = tableMapper.getKeyList();
+		this.typeInfo = typeInfo;
+	}
+
+	@VisibleForTesting
+	public HBasePojoSink(Table hTable, HBaseTableMapper tableMapper, TypeInformation<IN> typeInfo) {
+		super(hTable);
+		this.tableMapper = tableMapper;
+		this.fieldNameList = tableMapper.getKeyList();
+		this.typeInfo = typeInfo;
+	}
+
+	@Override
+	protected Object extract(IN value) throws Exception {
+		byte[] rowKey = HBaseTableMapper.serialize(tableMapper.getRowKeyType(), rowKeyField.get(value));
+		Put put = new Put(rowKey);
+		for (int i = 0; i < fieldNameList.length; i++) {
+			Tuple3<byte[], byte[], TypeInformation<?>> colInfo = tableMapper.getColInfo(fieldNameList[i]);
+			put.addColumn(colInfo.f0, colInfo.f1,
+				HBaseTableMapper.serialize(colInfo.f2, fields[i].get(value)));
+		}
+		return put;
+	}
+
+	@Override
+	public void open(Configuration configuration) throws Exception {
+		super.open(configuration);
+		Class<IN> clazz = typeInfo.getTypeClass();
+		fields = new Field[fieldNameList.length];
+		for (int i = 0; i < fields.length; i++) {
+			fields[i] = clazz.getDeclaredField(fieldNameList[i]);
+			fields[i].setAccessible(true);
+		}
+		rowKeyField = clazz.getDeclaredField(tableMapper.getRowKey());
+		rowKeyField.setAccessible(true);
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseRowSink.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseRowSink.java
new file mode 100644
index 00000000000..c882067a67a
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseRowSink.java
@@ -0,0 +1,53 @@
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+
+/**
+ * Flink Sink to save Row into a HBase cluster.
+ */
+public class HBaseRowSink extends HBaseSinkBase<Row> {
+	private static final long serialVersionUID = 1L;
+	private final HBaseTableMapper tableMapper;
+	private final RowTypeInfo typeInfo;
+	private final String[] fieldNameList;
+	private int rowKeyFieldIndex;
+
+	public HBaseRowSink(HBaseTableBuilder builder, HBaseTableMapper tableMapper, RowTypeInfo typeInfo) {
+		super(builder);
+		this.tableMapper = tableMapper;
+		this.typeInfo = typeInfo;
+		this.fieldNameList = tableMapper.getKeyList();
+	}
+
+	@VisibleForTesting
+	public HBaseRowSink(Table hTable, HBaseTableMapper tableMapper, RowTypeInfo typeInfo) {
+		super(hTable);
+		this.tableMapper = tableMapper;
+		this.typeInfo = typeInfo;
+		this.fieldNameList = tableMapper.getKeyList();
+	}
+
+	@Override protected Object extract(Row value) throws Exception {
+		byte[] rowKey = HBaseTableMapper.serialize(typeInfo.getTypeAt(rowKeyFieldIndex), value.getField(rowKeyFieldIndex));
+		Put put = new Put(rowKey);
+		for (int i = 0; i < fieldNameList.length; i++) {
+			Tuple3<byte[], byte[], TypeInformation<?>> colInfo = tableMapper.getColInfo(fieldNameList[i]);
+			put.addColumn(colInfo.f0, colInfo.f1,
+				HBaseTableMapper.serialize(colInfo.f2, value.getField(typeInfo.getFieldIndex(fieldNameList[i]))));
+		}
+		return put;
+	}
+
+	@Override public void open(Configuration configuration) throws Exception {
+		super.open(configuration);
+		rowKeyFieldIndex = typeInfo.getFieldIndex(tableMapper.getRowKey());
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseScalaProductSink.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseScalaProductSink.java
new file mode 100644
index 00000000000..eeb8138610f
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseScalaProductSink.java
@@ -0,0 +1,46 @@
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+
+import scala.Product;
+
+/**
+ * Sink to write scala tuples and case classes into a HBase cluster.
+ *
+ * @param <IN> Type of the elements emitted by this sink, it must extend {@link Product}
+ */
+public class HBaseScalaProductSink<IN extends Product> extends HBaseSinkBase<IN> {
+
+	private final HBaseTableMapper tableMapper;
+	private final String[] indexList;
+
+	public HBaseScalaProductSink(HBaseTableBuilder builder, HBaseTableMapper tableMapper) {
+		super(builder);
+		this.tableMapper = tableMapper;
+		this.indexList = tableMapper.getKeyList();
+	}
+
+	@VisibleForTesting
+	public HBaseScalaProductSink(Table hTable, HBaseTableMapper tableMapper) {
+		super(hTable);
+		this.tableMapper = tableMapper;
+		this.indexList = tableMapper.getKeyList();
+	}
+
+	@Override protected Object extract(IN value) throws Exception {
+		int rowKeyIndex = Integer.parseInt(tableMapper.getRowKey());
+		byte[] rowKey = HBaseTableMapper.serialize(tableMapper.getRowKeyType(), value.productElement(rowKeyIndex));
+		Put put = new Put(rowKey);
+		for (String index : indexList) {
+			Tuple3<byte[], byte[], TypeInformation<?>> colInfo = tableMapper.getColInfo(index);
+			put.addColumn(colInfo.f0, colInfo.f1,
+				HBaseTableMapper.serialize(colInfo.f2, value.productElement(Integer.parseInt(index))));
+		}
+		return put;
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseSink.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseSink.java
new file mode 100644
index 00000000000..7a1ab577c08
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseSink.java
@@ -0,0 +1,371 @@
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+
+import scala.Product;
+
+/**
+ * This class wraps different HBase sink implementations to provide a common interface for all of them.
+ *
+ * @param <IN> input type
+ */
+public class HBaseSink<IN> {
+
+	private DataStreamSink<IN> sink;
+
+	private HBaseSink(DataStreamSink<IN> sink) {
+		this.sink = sink;
+	}
+
+	private SinkTransformation<IN> getSinkTransformation() {
+		return sink.getTransformation();
+	}
+
+	/**
+	 * Sets the name of this sink. This name is
+	 * used by the visualization and logging during runtime.
+	 *
+	 * @return The named sink.
+	 */
+	public HBaseSink<IN> name(String name) {
+		getSinkTransformation().setName(name);
+		return this;
+	}
+
+	/**
+	 * Sets an ID for this operator.
+	 *
+	 * <p>The specified ID is used to assign the same operator ID across job
+	 * submissions (for example when starting a job from a savepoint).
+	 *
+	 * <p><strong>Important</strong>: this ID needs to be unique per
+	 * transformation and job. Otherwise, job submission will fail.
+	 *
+	 * @param uid The unique user-specified ID of this transformation.
+	 * @return The operator with the specified ID.
+	 */
+	@PublicEvolving
+	public HBaseSink<IN> uid(String uid) {
+		getSinkTransformation().setUid(uid);
+		return this;
+	}
+
+	/**
+	 * Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
+	 *
+	 * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
+	 * operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
+	 *
+	 * <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
+	 * needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
+	 * assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
+	 *
+	 * <p>A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
+	 * automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
+	 * obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
+	 *
+	 * @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
+	 *                 logs and web ui.
+	 * @return The operator with the user provided hash.
+	 */
+	@PublicEvolving
+	public HBaseSink<IN> setUidHash(String uidHash) {
+		getSinkTransformation().setUidHash(uidHash);
+		return this;
+	}
+
+	/**
+	 * Sets the parallelism for this sink. The degree must be higher than zero.
+	 *
+	 * @param parallelism The parallelism for this sink.
+	 * @return The sink with set parallelism.
+	 */
+	public HBaseSink<IN> setParallelism(int parallelism) {
+		getSinkTransformation().setParallelism(parallelism);
+		return this;
+	}
+
+	/**
+	 * Turns off chaining for this operator so thread co-location will not be
+	 * used as an optimization.
+	 * <p/>
+	 * <p/>
+	 * Chaining can be turned off for the whole
+	 * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
+	 * however it is not advised for performance considerations.
+	 *
+	 * @return The sink with chaining disabled
+	 */
+	public HBaseSink<IN> disableChaining() {
+		getSinkTransformation().setChainingStrategy(ChainingStrategy.NEVER);
+		return this;
+	}
+
+	/**
+	 * Sets the slot sharing group of this operation. Parallel instances of
+	 * operations that are in the same slot sharing group will be co-located in the same
+	 * TaskManager slot, if possible.
+	 *
+	 * <p>Operations inherit the slot sharing group of input operations if all input operations
+	 * are in the same slot sharing group and no slot sharing group was explicitly specified.
+	 *
+	 * <p>Initially an operation is in the default slot sharing group. An operation can be put into
+	 * the default group explicitly by setting the slot sharing group to {@code "default"}.
+	 *
+	 * @param slotSharingGroup The slot sharing group name.
+	 */
+	public HBaseSink<IN> slotSharingGroup(String slotSharingGroup) {
+		getSinkTransformation().setSlotSharingGroup(slotSharingGroup);
+		return this;
+	}
+
+	/**
+	 * Writes a DataStream into a HBase database.
+	 *
+	 * @param input input DataStream
+	 * @param <IN>  input type
+	 * @return HBaseSinkBuilder, to further configure the sink
+	 */
+	public static <IN> HBaseSinkBuilder<IN> addSink(org.apache.flink.streaming.api.scala.DataStream<IN> input) {
+		return addSink(input.javaStream());
+	}
+
+	/**
+	 * Writes a DataStream into a HBase database.
+	 *
+	 * @param input input DataStream
+	 * @param <IN>  input type
+	 * @return HBaseSinkBuilder, to further configure the sink
+	 */
+	public static <IN> HBaseSinkBuilder<IN> addSink(DataStream<IN> input) {
+		TypeInformation<IN> typeInfo = input.getType();
+		if (typeInfo instanceof TupleTypeInfo) {
+			DataStream<Tuple> tupleInput = (DataStream<Tuple>) input;
+			return (HBaseSinkBuilder<IN>) new HBaseTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
+		}
+		if (typeInfo instanceof RowTypeInfo) {
+			DataStream<Row> rowInput = (DataStream<Row>) input;
+			return (HBaseSinkBuilder<IN>) new HBaseRowSinkBuilder(rowInput, rowInput.getType(), rowInput.getType().createSerializer(rowInput.getExecutionEnvironment().getConfig()));
+		}
+		if (typeInfo instanceof PojoTypeInfo) {
+			return new HBasePojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
+		}
+		if (typeInfo instanceof CaseClassTypeInfo) {
+			DataStream<Product> productInput = (DataStream<Product>) input;
+			return (HBaseSinkBuilder<IN>) new HBaseScalaProductSinkBuilder<>(productInput, productInput.getType(), productInput.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
+		}
+		throw new IllegalArgumentException("No support for the type of the given DataStream: " + input.getType());
+	}
+
+	/**
+	 * Builder for a {@link HBaseSink}.
+	 * @param <IN>
+	 */
+	public abstract static class HBaseSinkBuilder<IN> {
+		protected final DataStream<IN> input;
+		protected final TypeSerializer<IN> serializer;
+		protected final TypeInformation<IN> typeInfo;
+		protected HBaseTableBuilder tableBuilder;
+		protected HBaseTableMapper tableMapper;
+
+		public HBaseSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
+			this.input = input;
+			this.serializer = serializer;
+			this.typeInfo = typeInfo;
+			tableBuilder = new HBaseTableBuilder();
+		}
+
+		/**
+		 * Sets the cluster key of HBase to connect to.
+		 * @param clusterKey
+		 * @return this builder
+		 * @throws IOException
+		 */
+		public HBaseSinkBuilder<IN> setClusterKey(String clusterKey) throws IOException {
+			tableBuilder.withClusterKey(clusterKey);
+			return this;
+		}
+
+		/**
+		 * Enable the client buffer for HBase.
+		 * Only flush when buffer is full or during checkpoint.
+		 * @return this builder
+		 */
+		public HBaseSinkBuilder<IN> enableBuffer() {
+			tableBuilder.enableBuffer(true);
+			return this;
+		}
+
+		/**
+		 * Disable the client buffer for HBase.
+		 * Flush to HBase on every operation. This might decrease the throughput and increase latency.
+		 * @return this builder
+		 */
+		public HBaseSinkBuilder<IN> disableBuffer() {
+			tableBuilder.enableBuffer(false);
+			return this;
+		}
+
+		/**
+		 * Sets the name of table to be used.
+		 * @param tableName
+		 * @return this builder
+		 */
+		public HBaseSinkBuilder<IN> setTableName(String tableName) {
+			tableBuilder.withTableName(tableName);
+			return this;
+		}
+
+		/**
+		 * Sets additional property for hbase.
+		 * @return this builder
+		 */
+		public HBaseSinkBuilder<IN> setProperty(String key, String value) {
+			tableBuilder.addProperty(key, value);
+			return this;
+		}
+
+		/**
+		 * Sets the mapper for the sink.
+		 * @param tableMapper {@link HBaseTableMapper}, records the mapping for a key to a HBase column family and qualifier.
+		 * @return this builder
+		 */
+		public HBaseSinkBuilder<IN> setTableMapper(HBaseTableMapper tableMapper) {
+			this.tableMapper = tableMapper;
+			return this;
+		}
+
+		/**
+		 * Finalizes the configuration of this sink.
+		 *
+		 * @return finalized sink
+		 * @throws Exception
+		 */
+		public HBaseSink<IN> build() throws Exception {
+			return createSink();
+		}
+
+		protected abstract HBaseSink<IN> createSink() throws Exception;
+
+		protected void sanityCheck() {
+			if (!tableBuilder.isClusterKeyConfigured()) {
+				throw new IllegalArgumentException("HBase cluster key must be supplied using setClusterKey().");
+			}
+			if (tableMapper == null) {
+				throw new IllegalArgumentException("HBaseTableMapper must be supplied using setTableMapper().");
+			}
+			if (tableMapper.getRowKey() == null) {
+				throw new IllegalArgumentException("Rowkey must be supplied using setRowKey() of HBaseTableMapper.");
+			}
+			if (tableMapper.getKeyList() == null || tableMapper.getKeyList().length == 0) {
+				throw new IllegalArgumentException(
+					"At least one column should be supplied using either setMapping() or addMapping.");
+			}
+			if (this.tableBuilder.getTableName() == null) {
+				throw new IllegalArgumentException("Table name must be supplied using setTableName() of HBaseTableMapper.");
+			}
+			for (String key : tableMapper.getKeyList()) {
+				if (StringUtils.isEmpty(key)) {
+					throw new IllegalArgumentException("The keys for sink cannot be empty.");
+				}
+			}
+		}
+	}
+
+	/**
+	 * Builder for a {@link HBaseTupleSink}.
+	 * @param <IN>
+	 */
+	public static class HBaseTupleSinkBuilder<IN extends Tuple> extends HBaseSinkBuilder<IN> {
+
+		public HBaseTupleSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
+			super(input, typeInfo, serializer);
+		}
+
+		@Override
+		protected void sanityCheck() {
+			super.sanityCheck();
+			for (String key : tableMapper.getKeyList()) {
+				if (StringUtils.isNumeric(key)) {
+					throw new IllegalArgumentException("The key: " + key + " for tuple sink must be index of tuple.");
+				}
+			}
+		}
+
+		@Override
+		protected HBaseSink<IN> createSink() throws Exception {
+			return new HBaseSink<>(input.addSink(new HBaseTupleSink<IN>(tableBuilder, tableMapper)));
+		}
+	}
+
+	/**
+	 * Builder for a {@link HBaseRowSink}.
+	 */
+	public static class HBaseRowSinkBuilder extends HBaseSinkBuilder<Row> {
+
+		public HBaseRowSinkBuilder(DataStream<Row> input, TypeInformation<Row> typeInfo, TypeSerializer<Row> serializer) {
+			super(input, typeInfo, serializer);
+		}
+
+		@Override
+		protected HBaseSink<Row> createSink() throws Exception {
+			return new HBaseSink<>(input.addSink(new HBaseRowSink(tableBuilder, tableMapper, (RowTypeInfo) typeInfo)));
+		}
+	}
+
+	/**
+	 * Builder for a {@link HBasePojoSink}.
+	 * @param <IN>
+	 */
+	public static class HBasePojoSinkBuilder<IN> extends HBaseSinkBuilder<IN> {
+
+		public HBasePojoSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
+			super(input, typeInfo, serializer);
+		}
+
+		@Override protected HBaseSink<IN> createSink() throws Exception {
+			return new HBaseSink<>(input.addSink(new HBasePojoSink<IN>(tableBuilder, tableMapper, typeInfo)));
+		}
+	}
+
+	/**
+	 * Builder for a {@link HBaseScalaProductSink}.
+	 * @param <IN>
+	 */
+	public static class HBaseScalaProductSinkBuilder<IN extends Product> extends HBaseSinkBuilder<IN> {
+
+		public HBaseScalaProductSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
+			super(input, typeInfo, serializer);
+		}
+
+		@Override protected HBaseSink<IN> createSink() throws Exception {
+			return new HBaseSink<>(input.addSink(new HBaseScalaProductSink<IN>(tableBuilder, tableMapper)));
+		}
+
+		@Override
+		protected void sanityCheck() {
+			super.sanityCheck();
+			for (String key : tableMapper.getKeyList()) {
+				if (StringUtils.isNumeric(key)) {
+					throw new IllegalArgumentException("The key: " + key + " for scala product sink must be index of product element.");
+				}
+			}
+		}
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseSinkBase.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseSinkBase.java
new file mode 100644
index 00000000000..efe2328f765
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseSinkBase.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HBaseSinkBase is the common abstract class of {@link HBasePojoSink}, {@link HBaseTupleSink}, {@link HBaseRowSink} and
+ * {@link HBaseScalaProductSink}.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public abstract class HBaseSinkBase<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
+	protected final Logger log = LoggerFactory.getLogger(getClass());
+
+	protected Connection connection;
+	protected transient Table hTable;
+
+	private final HBaseTableBuilder builder;
+
+	public HBaseSinkBase(HBaseTableBuilder builder) {
+		this.builder = builder;
+		ClosureCleaner.clean(builder, true);
+	}
+
+	@VisibleForTesting
+	public HBaseSinkBase(Table hTable){
+		this.builder = null;
+		this.connection = null;
+		this.hTable = hTable;
+	}
+
+	@Override
+	public void open(Configuration configuration) throws Exception {
+		super.open(configuration);
+		if (builder != null) {
+			this.connection = builder.buildConnection();
+		}
+		if (connection != null) {
+			this.hTable = builder.buildTable(connection);
+		}
+	}
+
+	@Override
+	public void invoke(IN value, Context context) throws Exception {
+		Object obj = extract(value);
+		if (obj == null) {
+			return;
+		} else if (obj instanceof Put) {
+			this.hTable.put((Put) obj);
+		} else if (obj instanceof Delete) {
+			this.hTable.delete((Delete) obj);
+		}
+	}
+
+	protected abstract Object extract(IN value) throws Exception;
+
+	@Override public void close() throws Exception {
+		super.close();
+		try {
+			if (this.hTable != null) {
+				this.hTable.close();
+			}
+		} catch (Throwable t) {
+			log.error("Error while closing HBase table.", t);
+		}
+		try {
+			if (this.connection != null) {
+				this.connection.close();
+			}
+		} catch (Throwable t) {
+			log.error("Error while closing HBase connection.", t);
+		}
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		if (this.hTable != null && this.hTable instanceof HTable) {
+			((HTable) this.hTable).flushCommits();
+		}
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception { }
+
+	protected byte[] getByteArray(Object obj, Class clazz) {
+		if (obj == null) {
+			return new byte[0];
+		}
+		if (clazz == Integer.class) {
+			return Bytes.toBytes((Integer) obj);
+		} else if (clazz == Long.class) {
+			return Bytes.toBytes((Long) obj);
+		} else if (clazz == String.class) {
+			return Bytes.toBytes((String) obj);
+		} else if (clazz == Byte.class) {
+			return Bytes.toBytes((Byte) obj);
+		} else if (clazz == Short.class) {
+			return Bytes.toBytes((Short) obj);
+		} else if (clazz == Float.class) {
+			return Bytes.toBytes((Float) obj);
+		} else if (clazz == Double.class) {
+			return Bytes.toBytes((Double) obj);
+		} else if (clazz == Character.class) {
+			return Bytes.toBytes((Character) obj);
+		} else if (clazz == Void.class) {
+			return new byte[0];
+		} else {
+			return Bytes.toBytes(obj.toString());
+		}
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseTableBuilder.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseTableBuilder.java
new file mode 100644
index 00000000000..7a490edeaa4
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseTableBuilder.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class is used to configure a {@link Connection} and a {@link Table} after deployment.
+ * The connection represents the connection that will be established to HBase.
+ * The table represents a table can be manipulated in the hbase.
+ */
+public class HBaseTableBuilder implements Serializable {
+
+	// Configuration is not Serializable
+	private Map<String, String> configurationMap = new HashMap<>();
+
+	private String tableName;
+	private boolean bufferEnabled = false;
+	private boolean clusterKeyConfigured = false;
+
+	public HBaseTableBuilder withClusterKey(String clusterKey) throws IOException {
+		mergeClusterkeyToConfiguration(clusterKey);
+		clusterKeyConfigured = true;
+		return this;
+	}
+
+	public HBaseTableBuilder withTableName(String tableName) {
+		this.tableName = tableName;
+		return this;
+	}
+
+	public HBaseTableBuilder enableBuffer(boolean bufferEnabled) {
+		this.bufferEnabled = bufferEnabled;
+		return this;
+	}
+
+	public HBaseTableBuilder addProperty(String key, String value) {
+		configurationMap.put(key, value);
+		return this;
+	}
+
+	public boolean isClusterKeyConfigured() {
+		return clusterKeyConfigured;
+	}
+
+	public String getTableName() {
+		return tableName;
+	}
+
+	public Connection buildConnection() throws IOException {
+		Configuration configuration = new Configuration();
+		for (String key : configurationMap.keySet()) {
+			configuration.set(key, configurationMap.get(key));
+		}
+		return ConnectionFactory.createConnection(configuration);
+	}
+
+	public Table buildTable(Connection connection) throws IOException {
+		Table hTable = connection.getTable(TableName.valueOf(tableName));
+
+		if (hTable instanceof HTable && bufferEnabled) {
+			((HTable) hTable).setAutoFlush(false, false);
+		}
+		Admin admin = connection.getAdmin();
+		try {
+			if (!admin.isTableAvailable(TableName.valueOf(this.tableName))) {
+				throw new IOException("Table is not available.");
+			}
+		} finally {
+			try {
+				if (admin != null) {
+					admin.close();
+				}
+			} catch (Throwable t) {
+
+			}
+		}
+		return hTable;
+	}
+
+	private void mergeClusterkeyToConfiguration(String clusterKey)
+		throws IOException {
+		if (clusterKey == null) {
+			throw new IOException("ClusterKey is null.");
+		}
+		String[] segments = clusterKey.split(":");
+		if (segments.length > 3) {
+			throw new IOException("ClusterKey:[" + clusterKey + "] is illegal.");
+		}
+		if (segments.length > 0) {
+			configurationMap.put(HConstants.ZOOKEEPER_QUORUM, segments[0]);
+		}
+		if (segments.length > 1) {
+			configurationMap.put(HConstants.ZOOKEEPER_CLIENT_PORT, segments[1]);
+		}
+		if (segments.length > 2) {
+			configurationMap.put(HConstants.ZOOKEEPER_ZNODE_PARENT, segments[2]);
+		}
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseTableMapper.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseTableMapper.java
new file mode 100644
index 00000000000..f40280384a8
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseTableMapper.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * This class is used to record the mapping of a key to column family and qualifier, as well as the table name and
+ * rowkey.
+ */
+public class HBaseTableMapper implements Serializable {
+
+	private final Map<String, Tuple3<String, String, TypeInformation<?>>> keyMap = new TreeMap<>();
+	private String rowKey;
+	private TypeInformation<?> rowKeyType;
+	private String charset = "UTF-8";
+
+	public HBaseTableMapper addMapping(String key, String columnFamily, String qualifier, Class clazz) {
+		keyMap.put(key, Tuple3.of(columnFamily, qualifier, TypeExtractor.getForClass(clazz)));
+		return this;
+	}
+
+	public HBaseTableMapper addMapping(int key, String columnFamily, String qualifier, Class clazz) {
+		keyMap.put(String.valueOf(key), Tuple3.of(columnFamily, qualifier, TypeExtractor.getForClass(clazz)));
+		return this;
+	}
+
+	public HBaseTableMapper setMapping(RowTypeInfo rowTypeInfo, String columnFamily) {
+		String[] fieldNames = rowTypeInfo.getFieldNames();
+		for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+			String fieldName = fieldNames[i];
+			TypeInformation typeInfo = rowTypeInfo.getTypeAt(i);
+			keyMap.put(fieldName, Tuple3.of(columnFamily, fieldName, typeInfo));
+		}
+		return this;
+	}
+
+	public <T> HBaseTableMapper setMapping(PojoTypeInfo<T> typeInfo, String columnFamily) {
+		for (int i = 0; i < typeInfo.getArity(); i++) {
+			PojoField pojoField = typeInfo.getPojoFieldAt(i);
+			String fieldName = pojoField.getField().getName();
+			TypeInformation<?> fieldTypeInfo = pojoField.getTypeInformation();
+			keyMap.put(fieldName, Tuple3.of(columnFamily, fieldName, fieldTypeInfo));
+		}
+		return this;
+	}
+
+	public HBaseTableMapper setRowKey(String key, Class clazz) {
+		this.rowKey = key;
+		this.rowKeyType = TypeExtractor.getForClass(clazz);
+		return this;
+	}
+
+	public HBaseTableMapper setRowKey(int keyIndex, Class clazz) {
+		this.rowKey = String.valueOf(keyIndex);
+		this.rowKeyType = TypeExtractor.getForClass(clazz);
+		return this;
+	}
+
+	public String getRowKey() {
+		return rowKey;
+	}
+
+	public TypeInformation<?> getRowKeyType() {
+		return rowKeyType;
+	}
+
+	public Tuple3<byte[], byte[], TypeInformation<?>> getColInfo(String key) {
+		Tuple3<String, String, TypeInformation<?>> rowInfo = keyMap.get(key);
+		return Tuple3.of(rowInfo.f0.getBytes(), rowInfo.f1.getBytes(), rowInfo.f2);
+	}
+
+	public String[] getKeyList() {
+		Set<String> keySet = keyMap.keySet();
+		String[] keyList = new String[keySet.size()];
+		int i = 0;
+		for (String key : keySet) {
+			keyList[i++] = key;
+		}
+		return keyList;
+	}
+
+	public String getCharset() {
+		return charset;
+	}
+
+	public void setCharset(String charset) {
+		this.charset = charset;
+	}
+
+	public static byte[] serialize(TypeInformation<?> typeInfo, Object obj) throws Exception {
+		Class clazz = typeInfo.getTypeClass();
+		if (byte[].class.equals(clazz)) {
+			return (byte[]) obj;
+		} else if (String.class.equals(clazz)) {
+			return Bytes.toBytes((String) obj);
+		} else if (Byte.class.equals(clazz)) {
+			return Bytes.toBytes((Byte) obj);
+		} else if (Short.class.equals(clazz)) {
+			return Bytes.toBytes((Short) obj);
+		} else if (Integer.class.equals(clazz)) {
+			return Bytes.toBytes((Integer) obj);
+		} else if (Long.class.equals(clazz)) {
+			return Bytes.toBytes((Long) obj);
+		} else if (Float.class.equals(clazz)) {
+			return Bytes.toBytes((Float) obj);
+		} else if (Double.class.equals(clazz)) {
+			return Bytes.toBytes((Double) obj);
+		} else if (Boolean.class.equals(clazz)) {
+			return Bytes.toBytes((Boolean) obj);
+		} else if (Timestamp.class.equals(clazz)) {
+			return Bytes.toBytes(((Timestamp) obj).getTime());
+		} else if (Date.class.equals(clazz)) {
+			return Bytes.toBytes(((Date) obj).getTime());
+		} else if (Time.class.equals(clazz)) {
+			return Bytes.toBytes(((Time) obj).getTime());
+		} else if (BigDecimal.class.equals(clazz)) {
+			return Bytes.toBytes((BigDecimal) obj);
+		} else if (BigInteger.class.equals(clazz)) {
+			return ((BigInteger) obj).toByteArray();
+		} else {
+			throw new Exception("Unsupported type " + clazz.getName());
+		}
+	}
+
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseTupleSink.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseTupleSink.java
new file mode 100644
index 00000000000..043c45a14e1
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseTupleSink.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+
+/**
+ * Sink to write tuple-like values into a HBase cluster.
+ *
+ * @param <IN> Type of the elements emitted by this sink, it must extend {@link Tuple}
+ */
+public class HBaseTupleSink<IN extends Tuple> extends HBaseSinkBase<IN> {
+
+	private final HBaseTableMapper tableMapper;
+	private final String[] indexList;
+
+	public HBaseTupleSink(HBaseTableBuilder builder, HBaseTableMapper tableMapper) {
+		super(builder);
+		this.tableMapper = tableMapper;
+		this.indexList = tableMapper.getKeyList();
+	}
+
+	@VisibleForTesting
+	public HBaseTupleSink(Table hTable, HBaseTableMapper tableMapper) {
+		super(hTable);
+		this.tableMapper = tableMapper;
+		this.indexList = tableMapper.getKeyList();
+	}
+
+	@Override protected Object extract(IN value) throws Exception {
+		int rowKeyIndex = Integer.parseInt(tableMapper.getRowKey());
+		byte[] rowKey = HBaseTableMapper.serialize(tableMapper.getRowKeyType(), value.getField(rowKeyIndex));
+		Put put = new Put(rowKey);
+		for (String index : indexList) {
+			Tuple3<byte[], byte[], TypeInformation<?>> colInfo = tableMapper.getColInfo(index);
+			put.addColumn(colInfo.f0, colInfo.f1,
+				HBaseTableMapper.serialize(colInfo.f2, value.getField(Integer.parseInt(index))));
+		}
+		return put;
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/HBaseSinkITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/HBaseSinkITCase.java
new file mode 100644
index 00000000000..d7633250e10
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/HBaseSinkITCase.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * IT cases for all HBase sinks.
+ */
+public class HBaseSinkITCase extends HBaseTestingClusterAutostarter {
+
+	private static final String TEST_TABLE = "testTable";
+
+	private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20);
+	private static final ArrayList<TestPojo> pojoCollection = new ArrayList<>(20);
+	private static final ArrayList<Row> rowCollection = new ArrayList<>(20);
+	private static final List<scala.Tuple3<String, Integer, Integer>> scalaTupleCollection = new ArrayList<>(20);
+
+	private static HTable table;
+
+	static {
+		for (int i = 0; i < 20; i++) {
+			collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0));
+			pojoCollection.add(new TestPojo(UUID.randomUUID().toString(), i, 0));
+			rowCollection.add(Row.of(UUID.randomUUID().toString(), i, 0));
+			scalaTupleCollection.add(new scala.Tuple3<>(UUID.randomUUID().toString(), i, 0));
+		}
+	}
+
+	private static final String FAMILY1 = "family1";
+	private static final String F1COL1 = "col1";
+
+	private static final String FAMILY2 = "family2";
+	private static final String F2COL1 = "col1";
+	private static final String F2COL2 = "col2";
+
+	@BeforeClass public static void activateHBaseCluster() throws Exception {
+		registerHBaseMiniClusterInClasspath();
+		table = prepareTable();
+	}
+
+	private static HTable prepareTable() throws IOException {
+		// create a table
+		TableName tableName = TableName.valueOf(TEST_TABLE);
+		// column families
+		byte[][] families = new byte[][]{
+			Bytes.toBytes(FAMILY1),
+			Bytes.toBytes(FAMILY2),
+		};
+		// split keys
+		byte[][] splitKeys = new byte[][]{ Bytes.toBytes(4) };
+
+		createTable(tableName, families, splitKeys);
+
+		// get the HTable instance
+		HTable table = openTable(tableName);
+		return table;
+	}
+
+	// ####### HBaseSink tests ############
+
+	@Test
+	public void testHBaseTupleSink() throws Exception {
+		HBaseTableMapper tableMapper = new HBaseTableMapper();
+		tableMapper.addMapping(0, FAMILY1, F1COL1, String.class)
+			.addMapping(1, FAMILY2, F2COL1, Integer.class)
+			.addMapping(2, FAMILY2, F2COL2, Integer.class)
+			.setRowKey(0, String.class);
+
+		ArrayList<byte[]> rowKeys = new ArrayList<>();
+		String[] keyList = tableMapper.getKeyList();
+		Map<String, Tuple3<byte[], byte[], byte[]>> validationMap = new HashMap<>();
+
+		HBaseTupleSink<Tuple3<String, Integer, Integer>> sink = new HBaseTupleSink<>(table, tableMapper);
+
+		Configuration configuration = new Configuration();
+		sink.open(configuration);
+
+		for (Tuple3<String, Integer, Integer> value : collection) {
+			sink.invoke(value, SinkContextUtil.forTimestamp(0));
+			rowKeys.add(Bytes.toBytes(value.f0));
+			validationMap.put(value.f0, Tuple3.of(Bytes.toBytes(value.f0), Bytes.toBytes(value.f1), Bytes.toBytes(value.f2)));
+		}
+		sink.close();
+		for (byte[] rowKey : rowKeys) {
+			// the field was processed in the ordered of keyList
+			for (int i = 0; i < keyList.length; i++) {
+				Tuple3<byte[], byte[], TypeInformation<?>> colInfo = tableMapper.getColInfo(keyList[i]);
+				byte[] result = table.get(new Get(rowKey)).getValue(colInfo.f0, colInfo.f1);
+				byte[] validation = validationMap.get(new String(rowKey, tableMapper.getCharset())).getField(i);
+				Assert.assertTrue(Arrays.equals(result, validation));
+			}
+		}
+	}
+
+	@Test
+	public void testHBasePojoSink() throws Exception {
+		HBaseTableMapper tableMapper = new HBaseTableMapper();
+		tableMapper.addMapping("key", FAMILY1, F1COL1, String.class)
+			.addMapping("value", FAMILY2, F2COL1, Integer.class)
+			.addMapping("oldValue", FAMILY2, F2COL2, Integer.class)
+			.setRowKey("key", String.class);
+
+		ArrayList<byte[]> rowKeys = new ArrayList<>();
+		String[] keyList = tableMapper.getKeyList();
+		Map<String, TestPojo> validationMap = new HashMap<>();
+
+		HBasePojoSink<TestPojo> sink = new HBasePojoSink<TestPojo>(table, tableMapper, TypeInformation.of(TestPojo.class));
+
+		Configuration configuration = new Configuration();
+		sink.open(configuration);
+
+		for (TestPojo value : pojoCollection) {
+			sink.invoke(value, SinkContextUtil.forTimestamp(0));
+			rowKeys.add(Bytes.toBytes(value.getKey()));
+			validationMap.put(value.getKey(), value);
+		}
+		sink.close();
+
+		for (byte[] rowKey : rowKeys) {
+			for (int i = 0; i < keyList.length; i++) {
+				Tuple3<byte[], byte[], TypeInformation<?>> colInfo = tableMapper.getColInfo(keyList[i]);
+				byte[] result = table.get(new Get(rowKey)).getValue(colInfo.f0, colInfo.f1);
+				Field field = TestPojo.class.getDeclaredField(keyList[i]);
+				field.setAccessible(true);
+				Object fieldValue = field.get(validationMap.get(new String(rowKey, tableMapper.getCharset())));
+				byte[] validation = HBaseTableMapper.serialize(colInfo.f2, fieldValue);
+				Assert.assertTrue(Arrays.equals(result, validation));
+			}
+		}
+	}
+
+	@Test
+	public void testHBaseScalaProductSink() throws Exception {
+		HBaseTableMapper tableMapper = new HBaseTableMapper();
+		tableMapper.addMapping(0, FAMILY1, F1COL1, String.class)
+			.addMapping(1, FAMILY2, F2COL1, Integer.class)
+			.addMapping(2, FAMILY2, F2COL2, Integer.class)
+			.setRowKey(0, String.class);
+
+		ArrayList<byte[]> rowKeys = new ArrayList<>();
+		String[] keyList = tableMapper.getKeyList();
+		Map<String, scala.Tuple3<String, Integer, Integer>> validationMap = new HashMap<>();
+
+		HBaseScalaProductSink<scala.Tuple3<String, Integer, Integer>> sink =
+			new HBaseScalaProductSink<scala.Tuple3<String, Integer, Integer>>(table, tableMapper);
+		for (scala.Tuple3<String, Integer, Integer> value : scalaTupleCollection) {
+			sink.invoke(value, SinkContextUtil.forTimestamp(0));
+			rowKeys.add(Bytes.toBytes(value._1()));
+			validationMap.put(value._1(), value);
+		}
+		sink.close();
+		for (byte[] rowKey : rowKeys) {
+			for (int i = 0; i < keyList.length; i++) {
+				Tuple3<byte[], byte[], TypeInformation<?>> colInfo = tableMapper.getColInfo(keyList[i]);
+				byte[] result = table.get(new Get(rowKey)).getValue(colInfo.f0, colInfo.f1);
+				scala.Tuple3<String, Integer, Integer> validationTuple =
+					validationMap.get(new String(rowKey, tableMapper.getCharset()));
+				byte[] validation = HBaseTableMapper.serialize(colInfo.f2, validationTuple.productElement(i));
+				Assert.assertTrue(Arrays.equals(result, validation));
+			}
+		}
+	}
+
+	@Test
+	public void testHBaseRowSink() throws Exception {
+		HBaseTableMapper tableMapper = new HBaseTableMapper();
+		tableMapper.addMapping("key", FAMILY1, F1COL1, String.class)
+			.addMapping("value", FAMILY2, F2COL1, Integer.class)
+			.addMapping("oldValue", FAMILY2, F2COL2, Integer.class)
+			.setRowKey("key", String.class);
+
+		ArrayList<byte[]> rowKeys = new ArrayList<>();
+		String[] keyList = tableMapper.getKeyList();
+		Map<String, Row> validationMap = new HashMap<>();
+
+		TypeInformation[] types = {Types.STRING(), Types.INT(), Types.INT()};
+		String[] fieldNames = {"key", "value", "oldValue"};
+		RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
+
+		HBaseRowSink sink = new HBaseRowSink(table, tableMapper, rowTypeInfo);
+
+		Configuration configuration = new Configuration();
+		sink.open(configuration);
+
+		for (Row value : rowCollection) {
+			sink.invoke(value, SinkContextUtil.forTimestamp(0));
+			String valueKey = (String) value.getField(rowTypeInfo.getFieldIndex("key"));
+			rowKeys.add(Bytes.toBytes(valueKey));
+			validationMap.put(valueKey, value);
+		}
+		sink.close();
+
+		for (byte[] rowKey : rowKeys) {
+			for (int i = 0; i < keyList.length; i++) {
+				Tuple3<byte[], byte[], TypeInformation<?>> colInfo = tableMapper.getColInfo(keyList[i]);
+				byte[] result = table.get(new Get(rowKey)).getValue(colInfo.f0, colInfo.f1);
+				byte[] validation = HBaseTableMapper.serialize(colInfo.f2,
+					validationMap.get(new String(rowKey, tableMapper.getCharset()))
+						.getField(rowTypeInfo.getFieldIndex(keyList[i])));
+				Assert.assertTrue(Arrays.equals(result, validation));
+			}
+		}
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/TestPojo.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/TestPojo.java
new file mode 100644
index 00000000000..bcc3430ebe2
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/TestPojo.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+/**
+ * Test Pojo for HBase.
+ */
+public class TestPojo {
+	private String key;
+	private Integer value;
+	private Integer oldValue;
+
+	public TestPojo(String key, Integer value, Integer oldValue) {
+		this.key = key;
+		this.value = value;
+		this.oldValue = oldValue;
+	}
+
+	public String getKey() {
+		return key;
+	}
+
+	public void setKey(String key) {
+		this.key = key;
+	}
+
+	public Integer getValue() {
+		return value;
+	}
+
+	public void setValue(Integer value) {
+		this.value = value;
+	}
+
+	public Integer getOldValue() {
+		return oldValue;
+	}
+
+	public void setOldValue(Integer oldValue) {
+		this.oldValue = oldValue;
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/example/HBasePojoSinkExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/example/HBasePojoSinkExample.java
new file mode 100644
index 00000000000..2ec590166f4
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/example/HBasePojoSinkExample.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase.example;
+
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.hbase.HBaseSink;
+import org.apache.flink.streaming.connectors.hbase.HBaseTableMapper;
+
+import java.util.ArrayList;
+import java.util.UUID;
+
+/**
+ * This is an example showing the to use the Pojo HBase Sink in the Streaming API.
+ *
+ * <p>The example assumes that a table exists in a local hbase database.
+ */
+public class HBasePojoSinkExample {
+	private static final ArrayList<Message> messages = new ArrayList<>(20);
+	private static final String TEST_TABLE = "testTable";
+
+	static {
+		for (int i = 0; i < 20; i++) {
+			messages.add(new Message(UUID.randomUUID().toString(), i, i * 10));
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Message> source = env.fromCollection(messages);
+
+		HBaseSink.addSink(source)
+			.setClusterKey("127.0.0.1:2181:/hbase")
+			.setTableMapper(
+				new HBaseTableMapper()
+					.addMapping("key", "CF1", "key", String.class)
+					.addMapping("value", "CF2", "value", Integer.class)
+					.addMapping("oldValue", "CF2", "oldValue", Integer.class)
+					.setRowKey("key", String.class)
+			)
+			.setTableName(TEST_TABLE)
+			.build();
+
+		env.execute("HBase Sink example");
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/example/HBaseRowSinkExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/example/HBaseRowSinkExample.java
new file mode 100644
index 00000000000..7276927feff
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/example/HBaseRowSinkExample.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase.example;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.hbase.HBaseSink;
+import org.apache.flink.streaming.connectors.hbase.HBaseTableMapper;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.UUID;
+
+/**
+ * This is an example showing the to use the Row HBase Sink in the Streaming API.
+ *
+ * <p>The example assumes that a table exists in a local hbase database.
+ */
+public class HBaseRowSinkExample {
+	private static final ArrayList<Row> messages = new ArrayList<>(20);
+	private static final String TEST_TABLE = "testTable";
+
+	static {
+		for (int i = 0; i < 20; i++) {
+			messages.add(Row.of(UUID.randomUUID().toString(), i, i * 10));
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		TypeInformation[] types = {Types.STRING, Types.INT, Types.INT};
+
+		String[] fields = {"key", "value", "oldValue"};
+
+		DataStreamSource<Row> source = env.fromCollection(messages, new RowTypeInfo(types, fields));
+		HBaseSink.addSink(source)
+			.setClusterKey("127.0.0.1:2181:/hbase")
+			.setTableMapper(
+				new HBaseTableMapper()
+					.addMapping("key", "CF1", "key", String.class)
+					.addMapping("value", "CF2", "value", Integer.class)
+					.addMapping("oldValue", "CF2", "oldValue", Integer.class)
+					.setRowKey("key", String.class)
+			)
+			.setTableName(TEST_TABLE)
+			.build();
+
+		env.execute("HBase Sink example");
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/example/HBaseTupleSinkExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/example/HBaseTupleSinkExample.java
new file mode 100644
index 00000000000..82889aa1633
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/example/HBaseTupleSinkExample.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase.example;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.hbase.HBaseSink;
+import org.apache.flink.streaming.connectors.hbase.HBaseTableMapper;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the Tuple HBase Sink in the Streaming API.
+ *
+ * <p>The example assumes that a table exists in a local hbase database.
+ */
+public class HBaseTupleSinkExample {
+	private static final ArrayList<Tuple2<String, Integer>> collection = new ArrayList<>(20);
+	private static final String TEST_TABLE = "testTable";
+
+	static {
+		for (int i = 0; i < 20; i++) {
+			collection.add(new Tuple2<>("hbase-" + i, i));
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection);
+
+		HBaseSink.addSink(source)
+			.setClusterKey("127.0.0.1:2181:/hbase")
+			.setTableMapper(
+				new HBaseTableMapper()
+					.addMapping(0, "CF1", "key", String.class)
+					.addMapping(1, "CF2", "value", Integer.class)
+					.setRowKey(0, String.class))
+			.setTableName(TEST_TABLE)
+			.build();
+
+		env.execute("HBase Sink example");
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/example/Message.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/example/Message.java
new file mode 100644
index 00000000000..18858eebfeb
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/example/Message.java
@@ -0,0 +1,45 @@
+package org.apache.flink.streaming.connectors.hbase.example;
+
+/**
+ * Pojo for flink hbase example.
+ */
+public class Message {
+
+	private String key;
+	private Integer value;
+	private Integer oldValue;
+
+	public Message() {
+		this(null, null, null);
+	}
+
+	public Message(String key, Integer value, Integer oldValue) {
+		this.key = key;
+		this.value = value;
+		this.oldValue = oldValue;
+	}
+
+	public String getKey() {
+		return key;
+	}
+
+	public void setKey(String key) {
+		this.key = key;
+	}
+
+	public Integer getValue() {
+		return value;
+	}
+
+	public void setValue(Integer value) {
+		this.value = value;
+	}
+
+	public Integer getOldValue() {
+		return oldValue;
+	}
+
+	public void setOldValue(Integer oldValue) {
+		this.oldValue = oldValue;
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/test/scala/org/apache/flink/streaming/connectors/hbase/example/HBaseScalaProductSink.scala b/flink-connectors/flink-hbase/src/test/scala/org/apache/flink/streaming/connectors/hbase/example/HBaseScalaProductSink.scala
new file mode 100644
index 00000000000..681a45aa45a
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/scala/org/apache/flink/streaming/connectors/hbase/example/HBaseScalaProductSink.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase.example
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.connectors.hbase.{HBaseSink, HBaseTableMapper}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * This is an example showing the to use the Scala Product HBase Sink in the Streaming API.
+  *
+  * <p>The example assumes that a table exists in a local hbase database.
+  */
+object HBaseScalaProductSink {
+  val TEST_TABLE = "testTable"
+  case class TestCaseClass(key: String, value: Int)
+
+  val collection = new ArrayBuffer[TestCaseClass](20)
+
+  for (i <- 0 until 20) {
+    collection += TestCaseClass("hbase-" + i, i)
+  }
+
+  def main(args: Array[String]): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    import org.apache.flink.api.scala._
+    val source = env.fromCollection(collection)
+
+    HBaseSink.addSink(source)
+      .setClusterKey("127.0.0.1:2181:/hbase")
+      .setTableMapper(
+        new HBaseTableMapper()
+          .addMapping(0, "CF1", "key", classOf[String])
+          .addMapping(1, "CF2", "value", classOf[Integer])
+          .setRowKey(0, classOf[String]))
+      .setTableName(TEST_TABLE)
+      .build
+
+    env.execute("HBase Sink example")
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services