You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/17 06:04:15 UTC

[flink] branch release-1.11 updated: [FLINK-18072][hbase] Fix HBaseLookupFunction can not work with new internal data structure RowData

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new b2711c5  [FLINK-18072][hbase] Fix HBaseLookupFunction can not work with new internal data structure RowData
b2711c5 is described below

commit b2711c5d7d197735e2cbedf4e675273ef7e9a3bb
Author: Leonard Xu <xb...@163.com>
AuthorDate: Wed Jun 17 14:01:36 2020 +0800

    [FLINK-18072][hbase] Fix HBaseLookupFunction can not work with new internal data structure RowData
    
    This closes #12594
---
 .../hbase/source/HBaseDynamicTableSource.java      |   2 +-
 .../hbase/source/HBaseLookupFunction.java          |   2 +-
 ...nction.java => HBaseRowDataLookupFunction.java} |  51 +--
 .../flink/connector/hbase/util/HBaseSerde.java     |  26 ++
 .../connector/hbase/HBaseConnectorITCase.java      | 423 ++++++++-------------
 .../hbase/HBaseDynamicTableFactoryTest.java        |   6 +-
 .../flink/connector/hbase/util/HBaseTestBase.java  |  68 +++-
 7 files changed, 280 insertions(+), 298 deletions(-)

diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
index 1dac67ae..537d4b2 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
@@ -77,7 +77,7 @@ public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSour
 				.isPresent(),
 			"Currently, HBase table only supports lookup by rowkey field.");
 
-		return TableFunctionProvider.of(new HBaseLookupFunction(conf, tableName, hbaseSchema));
+		return TableFunctionProvider.of(new HBaseRowDataLookupFunction(conf, tableName, hbaseSchema, nullStringLiteral));
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
index 1d608e9..c84cbe1 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
@@ -45,7 +45,7 @@ import java.io.IOException;
 
 /**
  * The HBaseLookupFunction is a standard user-defined table function, it can be used in tableAPI
- * and also useful for temporal table join plan in SQL.
+ * and also useful for temporal table join plan in SQL. It looks up the result as {@link Row}.
  */
 @Internal
 public class HBaseLookupFunction extends TableFunction<Row> {
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
similarity index 78%
copy from flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
copy to flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
index 1d608e9..6a36fa3 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
@@ -20,22 +20,21 @@ package org.apache.flink.connector.hbase.source;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
-import org.apache.flink.connector.hbase.util.HBaseReadWriteHelper;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.types.Row;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.slf4j.Logger;
@@ -44,29 +43,33 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 
 /**
- * The HBaseLookupFunction is a standard user-defined table function, it can be used in tableAPI
- * and also useful for temporal table join plan in SQL.
+ * The HBaseRowDataLookupFunction is a standard user-defined table function, it can be used in tableAPI
+ * and also useful for temporal table join plan in SQL. It looks up the result as {@link RowData}.
  */
 @Internal
-public class HBaseLookupFunction extends TableFunction<Row> {
-	private static final Logger LOG = LoggerFactory.getLogger(HBaseLookupFunction.class);
+public class HBaseRowDataLookupFunction extends TableFunction<RowData> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataLookupFunction.class);
 	private static final long serialVersionUID = 1L;
 
 	private final String hTableName;
 	private final byte[] serializedConfig;
 	private final HBaseTableSchema hbaseTableSchema;
+	private final String nullStringLiteral;
 
-	private transient HBaseReadWriteHelper readHelper;
 	private transient Connection hConnection;
 	private transient HTable table;
+	private transient HBaseSerde serde;
 
-	public HBaseLookupFunction(
+	public HBaseRowDataLookupFunction(
 			Configuration configuration,
 			String hTableName,
-			HBaseTableSchema hbaseTableSchema) {
+			HBaseTableSchema hbaseTableSchema,
+			String nullStringLiteral) {
 		this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
 		this.hTableName = hTableName;
 		this.hbaseTableSchema = hbaseTableSchema;
+		this.nullStringLiteral = nullStringLiteral;
 	}
 
 	/**
@@ -75,25 +78,23 @@ public class HBaseLookupFunction extends TableFunction<Row> {
 	 */
 	public void eval(Object rowKey) throws IOException {
 		// fetch result
-		Result result = table.get(readHelper.createGet(rowKey));
-		if (!result.isEmpty()) {
-			// parse and collect
-			collect(readHelper.parseToRow(result, rowKey));
+		Get get = serde.createGet(rowKey);
+		if (get != null) {
+			Result result = table.get(get);
+			if (!result.isEmpty()) {
+				// parse and collect
+				collect(serde.convertToRow(result));
+			}
 		}
 	}
 
-	@Override
-	public TypeInformation<Row> getResultType() {
-		return hbaseTableSchema.convertsToTableSchema().toRowType();
-	}
-
-	private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() {
+	private Configuration prepareRuntimeConfiguration() {
 		// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
 		// and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
 		// user params from client-side have the highest priority
-		org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(
+		Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(
 			serializedConfig,
-			HBaseConfiguration.create());
+			HBaseConfigurationUtil.getHBaseConfiguration());
 
 		// do validation: check key option(s) in final runtime configuration
 		if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
@@ -107,7 +108,7 @@ public class HBaseLookupFunction extends TableFunction<Row> {
 	@Override
 	public void open(FunctionContext context) {
 		LOG.info("start open ...");
-		org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
+		Configuration config = prepareRuntimeConfiguration();
 		try {
 			hConnection = ConnectionFactory.createConnection(config);
 			table = (HTable) hConnection.getTable(TableName.valueOf(hTableName));
@@ -118,7 +119,7 @@ public class HBaseLookupFunction extends TableFunction<Row> {
 			LOG.error("Exception while creating connection to HBase.", ioe);
 			throw new RuntimeException("Cannot create connection to HBase.", ioe);
 		}
-		this.readHelper = new HBaseReadWriteHelper(hbaseTableSchema);
+		this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
 		LOG.info("end open.");
 	}
 
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index 693cb62..f888914 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -76,6 +77,7 @@ public class HBaseSerde {
 	private final @Nullable FieldDecoder keyDecoder;
 	private final FieldEncoder[][] qualifierEncoders;
 	private final FieldDecoder[][] qualifierDecoders;
+	private final GenericRowData rowWithRowKey;
 
 	public HBaseSerde(HBaseTableSchema hbaseSchema, final String nullStringLiteral) {
 		this.families = hbaseSchema.getFamilyKeys();
@@ -115,6 +117,7 @@ public class HBaseSerde {
 				.toArray(FieldDecoder[]::new);
 			this.reusedFamilyRows[f] = new GenericRowData(dataTypes.length);
 		}
+		this.rowWithRowKey = new GenericRowData(1);
 	}
 
 	/**
@@ -196,6 +199,29 @@ public class HBaseSerde {
 	}
 
 	/**
+	 * Returns an instance of Get that retrieves the matches records from the HBase table.
+	 *
+	 * @return The appropriate instance of Get for this use case.
+	 */
+	public Get createGet(Object rowKey) {
+		checkArgument(keyEncoder != null, "row key is not set.");
+		rowWithRowKey.setField(0, rowKey);
+		byte[] rowkey = keyEncoder.encode(rowWithRowKey, 0);
+		if (rowkey.length == 0) {
+			// drop dirty records, rowkey shouldn't be zero length
+			return null;
+		}
+		Get get = new Get(rowkey);
+		for (int f = 0; f < families.length; f++) {
+			byte[] family = families[f];
+			for (byte[] qualifier : qualifiers[f]) {
+				get.addColumn(family, qualifier);
+			}
+		}
+		return get;
+	}
+
+	/**
 	 * Converts HBase {@link Result} into {@link RowData}.
 	 */
 	public RowData convertToRow(Result result) {
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
index 3777dec..9ab36cc 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.connector.hbase.source.HBaseInputFormat;
 import org.apache.flink.connector.hbase.source.HBaseTableSource;
-import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.connector.hbase.util.HBaseTestBase;
 import org.apache.flink.connector.hbase.util.PlannerType;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -40,20 +39,17 @@ import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.api.internal.TableImpl;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil;
 import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
 import org.apache.flink.table.planner.sinks.CollectRowTableSink;
 import org.apache.flink.table.planner.sinks.CollectTableSink;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.runtime.utils.StreamITCase;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -61,26 +57,15 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
+import java.util.stream.Collectors;
 
 import scala.Option;
 
 import static org.apache.flink.connector.hbase.util.PlannerType.OLD_PLANNER;
 import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -124,7 +109,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 			hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
 			hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
 			hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-			hbaseTable.setRowKey("rowkey", Integer.class);
+			hbaseTable.setRowKey(ROW_KEY, Integer.class);
 			((TableEnvironmentInternal) tEnv).registerTableSourceInternal("hTable", hbaseTable);
 		} else {
 			tEnv.executeSql(
@@ -176,7 +161,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 			hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
 			hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
 			hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-			hbaseTable.setRowKey("rowkey", Integer.class);
+			hbaseTable.setRowKey(ROW_KEY, Integer.class);
 			((TableEnvironmentInternal) tEnv).registerTableSourceInternal("hTable", hbaseTable);
 		} else {
 			tEnv.executeSql(
@@ -221,7 +206,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 		if (isLegacyConnector) {
 			HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1);
 			// shuffle order of column registration
-			hbaseTable.setRowKey("rowkey", Integer.class);
+			hbaseTable.setRowKey(ROW_KEY, Integer.class);
 			hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
 			hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
 			hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
@@ -268,7 +253,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 			HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1);
 			hbaseTable.addColumn(FAMILY2, F2COL1, byte[].class);
 			hbaseTable.addColumn(FAMILY2, F2COL2, byte[].class);
-			hbaseTable.setRowKey("rowkey", Integer.class);
+			hbaseTable.setRowKey(ROW_KEY, Integer.class);
 			((TableEnvironmentInternal) tEnv).registerTableSourceInternal("hTable", hbaseTable);
 		} else {
 			tEnv.executeSql(
@@ -319,115 +304,33 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 		assertEquals(360, (int) resultSet.get(0).f0);
 	}
 
-	// -------------------------------------------------------------------------------------
-	// HBaseTableSink tests
-	// -------------------------------------------------------------------------------------
-
-	// prepare a source collection.
-	private static final List<Row> testData1 = new ArrayList<>();
-	private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(
-		new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE,
-			Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_DATE, Types.SQL_TIME},
-		new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3", "f4c1", "f4c2", "f4c3"});
-
-	static {
-		testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1",
-			Timestamp.valueOf("2019-08-18 19:00:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:00:00")));
-		testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2",
-			Timestamp.valueOf("2019-08-18 19:01:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:01:00")));
-		testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3",
-			Timestamp.valueOf("2019-08-18 19:02:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:02:00")));
-		testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4",
-			Timestamp.valueOf("2019-08-18 19:03:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:03:00")));
-		testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5",
-			Timestamp.valueOf("2019-08-19 19:10:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:10:00")));
-		testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6",
-			Timestamp.valueOf("2019-08-19 19:20:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:20:00")));
-		testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7",
-			Timestamp.valueOf("2019-08-19 19:30:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:30:00")));
-		testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8",
-			Timestamp.valueOf("2019-08-19 19:40:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:40:00")));
-	}
-
 	@Test
 	public void testTableSink() throws Exception {
 		StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
 		StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
 
-		if (isLegacyConnector) {
-			HBaseTableSchema schema = new HBaseTableSchema();
-			schema.addColumn(FAMILY1, F1COL1, Integer.class);
-			schema.addColumn(FAMILY2, F2COL1, String.class);
-			schema.addColumn(FAMILY2, F2COL2, Long.class);
-			schema.setRowKey("rk", Integer.class);
-			schema.addColumn(FAMILY3, F3COL1, Double.class);
-			schema.addColumn(FAMILY3, F3COL2, Boolean.class);
-			schema.addColumn(FAMILY3, F3COL3, String.class);
-
-			Map<String, String> tableProperties = new HashMap<>();
-			tableProperties.put("connector.type", "hbase");
-			tableProperties.put("connector.version", "1.4.3");
-			tableProperties.put("connector.property-version", "1");
-			tableProperties.put("connector.table-name", TEST_TABLE_2);
-			tableProperties.put("connector.zookeeper.quorum", getZookeeperQuorum());
-			tableProperties.put("connector.zookeeper.znode.parent", "/hbase");
-			DescriptorProperties descriptorProperties = new DescriptorProperties(true);
-			descriptorProperties.putTableSchema(SCHEMA, schema.convertsToTableSchema());
-			descriptorProperties.putProperties(tableProperties);
-			TableSink tableSink = TableFactoryService
-				.find(HBaseTableFactory.class, descriptorProperties.asMap())
-				.createTableSink(descriptorProperties.asMap());
-			((TableEnvironmentInternal) tEnv).registerTableSinkInternal("hbase", tableSink);
-		} else {
-			tEnv.executeSql(
-					"CREATE TABLE hbase (" +
-					" family1 ROW<col1 INT>," +
-					" family2 ROW<col1 STRING, col2 BIGINT>," +
-					" rk INT," +
-					" family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>" +
-					") WITH (" +
-					" 'connector' = 'hbase-1.4'," +
-					" 'table-name' = '" + TEST_TABLE_1 + "'," +
-					" 'zookeeper.quorum' = '" + getZookeeperQuorum() + "'," +
-					" 'zookeeper.znode.parent' = '/hbase'" +
-					")");
-		}
+		// register HBase table testTable1 which contains test data
+		String table1DDL = createHBaseTableDDL(TEST_TABLE_1, false);
+		tEnv.executeSql(table1DDL);
+
+		String table2DDL = createHBaseTableDDL(TEST_TABLE_2, false);
+		tEnv.executeSql(table2DDL);
 
-		DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1);
-		tEnv.createTemporaryView("src", ds);
+		String query = "INSERT INTO " + TEST_TABLE_2 + " SELECT" +
+			" rowkey," +
+			" family1," +
+			" family2," +
+			" family3" +
+			" FROM " + TEST_TABLE_1;
 
-		String query = "INSERT INTO hbase SELECT ROW(f1c1), ROW(f2c1, f2c2), rowkey, ROW(f3c1, f3c2, f3c3) FROM src";
+		// wait to finish
 		TableEnvUtil.execInsertSqlAndWaitResult(tEnv, query);
 
 		// start a batch scan job to verify contents in HBase table
-		// start a batch scan job to verify contents in HBase table
-		TableEnvironment batchTableEnv = createBatchTableEnv();
-
-		if (isLegacyConnector) {
-			HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_2);
-			hbaseTable.setRowKey("rowkey", Integer.class);
-			hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
-			hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
-			hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
-			hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
-			hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
-			hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-			((TableEnvironmentInternal) batchTableEnv).registerTableSourceInternal("hTable", hbaseTable);
-		} else {
-			batchTableEnv.executeSql(
-					"CREATE TABLE hTable (" +
-					" rowkey INT," +
-					" family1 ROW<col1 INT>," +
-					" family2 ROW<col1 STRING, col2 BIGINT>," +
-					" family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>" +
-					") WITH (" +
-					" 'connector' = 'hbase-1.4'," +
-					" 'table-name' = '" + TEST_TABLE_1 + "'," +
-					" 'zookeeper.quorum' = '" + getZookeeperQuorum() + "'" +
-					")");
-		}
+		TableEnvironment batchEnv = createBatchTableEnv();
+		batchEnv.executeSql(table2DDL);
 
-		Table table = batchTableEnv.sqlQuery(
+		Table table = batchEnv.sqlQuery(
 			"SELECT " +
 				"  h.rowkey, " +
 				"  h.family1.col1, " +
@@ -436,9 +339,8 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 				"  h.family3.col1, " +
 				"  h.family3.col2, " +
 				"  h.family3.col3 " +
-				"FROM hTable AS h"
+				"FROM " + TEST_TABLE_2 + " AS h"
 		);
-
 		List<Row> results = collectBatchResult(table);
 		String expected =
 				"1,10,Hello-1,100,1.01,false,Welt-1\n" +
@@ -455,56 +357,37 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 
 	@Test
 	public void testTableSourceSinkWithDDL() throws Exception {
+		if (OLD_PLANNER.equals(planner) || isLegacyConnector) {
+			// only test for blink planner and new connector, because types TIMESTAMP/DATE/TIME/DECIMAL works well in
+			// new connector(using blink-planner), but exits some precision problem in old planner or legacy connector.
+			return;
+		}
+
 		StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
 		StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
 
-		DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1);
-		tEnv.createTemporaryView("src", ds);
+		// regiter HBase table testTable1 which contains test data
+		String table1DDL = createHBaseTableDDL(TEST_TABLE_1, true);
+		tEnv.executeSql(table1DDL);
 
-		// register hbase table
-		String quorum = getZookeeperQuorum();
-		String ddl;
-		if (isLegacyConnector) {
-			ddl = "CREATE TABLE hbase (\n" +
-				"    rowkey INT," +
-				"    family1 ROW<col1 INT>,\n" +
-				"    family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" +
-				"    family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>,\n" +
-				"    family4 ROW<col1 TIMESTAMP(3), col2 DATE, col3 TIME(3)>\n" +
-				") WITH (\n" +
-				"    'connector.type' = 'hbase',\n" +
-				"    'connector.version' = '1.4.3',\n" +
-				"    'connector.table-name' = 'testTable3',\n" +
-				"    'connector.zookeeper.quorum' = '" + quorum + "',\n" +
-				"    'connector.zookeeper.znode.parent' = '/hbase' " +
-				")";
-		} else {
-			ddl = "CREATE TABLE hbase (\n" +
-				"    rowkey INT," +
-				"    family1 ROW<col1 INT>,\n" +
-				"    family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" +
-				"    family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>,\n" +
-				"    family4 ROW<col1 TIMESTAMP(3), col2 DATE, col3 TIME(3)>\n" +
-				") WITH (\n" +
-				"    'connector' = 'hbase-1.4',\n" +
-				"    'table-name' = 'testTable3',\n" +
-				"    'zookeeper.quorum' = '" + quorum + "',\n" +
-				"    'zookeeper.znode.parent' = '/hbase' " +
-				")";
-		}
-		tEnv.executeSql(ddl);
+		// register HBase table which is empty
+		String table3DDL = createHBaseTableDDL(TEST_TABLE_3, true);
+		tEnv.executeSql(table3DDL);
 
-		String query = "INSERT INTO hbase " +
-			"SELECT rowkey, ROW(f1c1), ROW(f2c1, f2c2), ROW(f3c1, f3c2, f3c3), ROW(f4c1, f4c2, f4c3) " +
-			"FROM src";
-		TableEnvUtil.execInsertSqlAndWaitResult(tEnv, query);
+		String insertStatement = "INSERT INTO " + TEST_TABLE_3 +
+			" SELECT rowkey," +
+			" family1," +
+			" family2," +
+			" family3," +
+			" family4" +
+			" from " + TEST_TABLE_1;
+		// wait to finish
+		TableEnvUtil.execInsertSqlAndWaitResult(tEnv, insertStatement);
 
 		// start a batch scan job to verify contents in HBase table
-		TableEnvironment batchTableEnv = createBatchTableEnv();
-		batchTableEnv.executeSql(ddl);
-
-		Table table = batchTableEnv.sqlQuery(
-			"SELECT " +
+		TableEnvironment batchEnv = createBatchTableEnv();
+		batchEnv.executeSql(table3DDL);
+		String query = "SELECT " +
 				"  h.rowkey, " +
 				"  h.family1.col1, " +
 				"  h.family2.col1, " +
@@ -514,115 +397,103 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 				"  h.family3.col3, " +
 				"  h.family4.col1, " +
 				"  h.family4.col2, " +
-				"  h.family4.col3 " +
-				"FROM hbase AS h"
-		);
-
-		List<Row> results = collectBatchResult(table);
-		String expected =
-				"1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18 19:00:00.0,2019-08-18,19:00:00\n" +
-				"2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18 19:01:00.0,2019-08-18,19:01:00\n" +
-				"3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18 19:02:00.0,2019-08-18,19:02:00\n" +
-				"4,40,null,400,4.04,true,Welt-4,2019-08-18 19:03:00.0,2019-08-18,19:03:00\n" +
-				"5,50,Hello-5,500,5.05,false,Welt-5,2019-08-19 19:10:00.0,2019-08-19,19:10:00\n" +
-				"6,60,Hello-6,600,6.06,true,Welt-6,2019-08-19 19:20:00.0,2019-08-19,19:20:00\n" +
-				"7,70,Hello-7,700,7.07,false,Welt-7,2019-08-19 19:30:00.0,2019-08-19,19:30:00\n" +
-				"8,80,null,800,8.08,true,Welt-8,2019-08-19 19:40:00.0,2019-08-19,19:40:00\n";
-
-		TestBaseUtils.compareResultAsText(results, expected);
-	}
-
+				"  h.family4.col3, " +
+				"  h.family4.col4 " +
+				" FROM " + TEST_TABLE_3 + " AS h";
+		Iterator<Row> collected = tEnv.executeSql(query).collect();
+		List<String> result = Lists.newArrayList(collected).stream()
+			.map(Row::toString)
+			.sorted()
+			.collect(Collectors.toList());
 
-	// -------------------------------------------------------------------------------------
-	// HBase lookup source tests
-	// -------------------------------------------------------------------------------------
-
-	// prepare a source collection.
-	private static final List<Row> testData2 = new ArrayList<>();
-	private static final RowTypeInfo testTypeInfo2 = new RowTypeInfo(
-		new TypeInformation[]{Types.INT, Types.LONG, Types.STRING},
-		new String[]{"a", "b", "c"});
-
-	static {
-		testData2.add(Row.of(1, 1L, "Hi"));
-		testData2.add(Row.of(2, 2L, "Hello"));
-		testData2.add(Row.of(3, 2L, "Hello world"));
-		testData2.add(Row.of(3, 3L, "Hello world!"));
+		List<String> expected = new ArrayList<>();
+		expected.add("1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18T19:00,2019-08-18,19:00,12345678.0001");
+		expected.add("2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18T19:01,2019-08-18,19:01,12345678.0002");
+		expected.add("3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18T19:02,2019-08-18,19:02,12345678.0003");
+		expected.add("4,40,null,400,4.04,true,Welt-4,2019-08-18T19:03,2019-08-18,19:03,12345678.0004");
+		expected.add("5,50,Hello-5,500,5.05,false,Welt-5,2019-08-19T19:10,2019-08-19,19:10,12345678.0005");
+		expected.add("6,60,Hello-6,600,6.06,true,Welt-6,2019-08-19T19:20,2019-08-19,19:20,12345678.0006");
+		expected.add("7,70,Hello-7,700,7.07,false,Welt-7,2019-08-19T19:30,2019-08-19,19:30,12345678.0007");
+		expected.add("8,80,null,800,8.08,true,Welt-8,2019-08-19T19:40,2019-08-19,19:40,12345678.0008");
+		assertEquals(expected, result);
 	}
 
 	@Test
 	public void testHBaseLookupTableSource() throws Exception {
-		if (OLD_PLANNER.equals(planner)) {
+		if (OLD_PLANNER.equals(planner) || isLegacyConnector) {
 			// lookup table source is only supported in blink planner, skip for old planner
+			// types TIMESTAMP/DATE/TIME/DECIMAL works well in new connector, skip legacy connector
 			return;
 		}
-		StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-		StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, streamSettings);
-		StreamITCase.clear();
+
+		StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+
+		tEnv.executeSql(
+			"CREATE TABLE " + TEST_TABLE_1 + " (" +
+				" family1 ROW<col1 INT>," +
+				" family2 ROW<col1 STRING, col2 BIGINT>," +
+				" family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>," +
+				" rowkey INT," +
+				" family4 ROW<col1 TIMESTAMP(3), col2 DATE, col3 TIME(3), col4 DECIMAL(12, 4)>," +
+				" PRIMARY KEY (rowkey) NOT ENFORCED" +
+				") WITH (" +
+				" 'connector' = 'hbase-1.4'," +
+				" 'table-name' = '" + TEST_TABLE_1 + "'," +
+				" 'zookeeper.quorum' = '" + getZookeeperQuorum() + "'" +
+				")");
 
 		// prepare a source table
 		String srcTableName = "src";
-		DataStream<Row> ds = streamEnv.fromCollection(testData2).returns(testTypeInfo2);
-		Table in = streamTableEnv.fromDataStream(ds, $("a"), $("b"), $("c"), $("proc").proctime());
-		streamTableEnv.registerTable(srcTableName, in);
+		DataStream<Row> srcDs = execEnv.fromCollection(testData).returns(testTypeInfo);
+		Table in = tEnv.fromDataStream(srcDs, $("a"), $("b"), $("c"), $("proc").proctime());
+		tEnv.registerTable(srcTableName, in);
 
-		if (isLegacyConnector) {
-			Map<String, String> tableProperties = hbaseTableProperties();
-			TableSource<?> source = TableFactoryService
-				.find(HBaseTableFactory.class, tableProperties)
-				.createTableSource(tableProperties);
-			((TableEnvironmentInternal) streamTableEnv).registerTableSourceInternal("hbaseLookup", source);
-		} else {
-			streamTableEnv.executeSql(
-					"CREATE TABLE hbaseLookup (" +
-					" family1 ROW<col1 INT>," +
-					" rk INT," +
-					" family2 ROW<col1 STRING, col2 BIGINT>," +
-					" family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>" +
-					") WITH (" +
-					" 'connector' = 'hbase-1.4'," +
-					" 'table-name' = '" + TEST_TABLE_1 + "'," +
-					" 'zookeeper.quorum' = '" + getZookeeperQuorum() + "'" +
-					")");
-		}
 		// perform a temporal table join query
-		String query = "SELECT a,family1.col1, family3.col3 FROM src " +
-			"JOIN hbaseLookup FOR SYSTEM_TIME AS OF src.proc as h ON src.a = h.rk";
-		Table result = streamTableEnv.sqlQuery(query);
-
-		DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink<>());
-
-		streamEnv.execute();
+		String dimJoinQuery = "SELECT" +
+			" a," +
+			" b," +
+			" h.family1.col1," +
+			" h.family2.col1," +
+			" h.family2.col2," +
+			" h.family3.col1," +
+			" h.family3.col2," +
+			" h.family3.col3," +
+			" h.family4.col1," +
+			" h.family4.col2," +
+			" h.family4.col3," +
+			" h.family4.col4 " +
+			" FROM src JOIN " + TEST_TABLE_1 + " FOR SYSTEM_TIME AS OF src.proc as h ON src.a = h.rowkey";
+		Iterator<Row> collected = tEnv.executeSql(dimJoinQuery).collect();
+		List<String> result = Lists.newArrayList(collected).stream()
+			.map(Row::toString)
+			.sorted()
+			.collect(Collectors.toList());
 
 		List<String> expected = new ArrayList<>();
-		expected.add("1,10,Welt-1");
-		expected.add("2,20,Welt-2");
-		expected.add("3,30,Welt-3");
-		expected.add("3,30,Welt-3");
+		expected.add("1,1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18T19:00,2019-08-18,19:00,12345678.0001");
+		expected.add("2,2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18T19:01,2019-08-18,19:01,12345678.0002");
+		expected.add("3,2,30,Hello-3,300,3.03,false,Welt-3,2019-08-18T19:02,2019-08-18,19:02,12345678.0003");
+		expected.add("3,3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18T19:02,2019-08-18,19:02,12345678.0003");
 
-		StreamITCase.compareWithList(expected);
+		assertEquals(expected, result);
 	}
 
-	private static Map<String, String> hbaseTableProperties() {
-		Map<String, String> properties = new HashMap<>();
-		properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE);
-		properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
-		properties.put(CONNECTOR_PROPERTY_VERSION, "1");
-		properties.put(CONNECTOR_TABLE_NAME, TEST_TABLE_1);
-		properties.put(CONNECTOR_ZK_QUORUM, getZookeeperQuorum());
-		// schema
-		String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
-		TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT);
-		TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, F2COL2}, Types.STRING, Types.LONG);
-		TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING);
-		TypeInformation[] columnTypes = new TypeInformation[]{f1, Types.INT, f2, f3};
-
-		DescriptorProperties descriptorProperties = new DescriptorProperties(true);
-		TableSchema tableSchema = new TableSchema(columnNames, columnTypes);
-		descriptorProperties.putTableSchema(SCHEMA, tableSchema);
-		descriptorProperties.putProperties(properties);
-		return descriptorProperties.asMap();
+	// -------------------------------------------------------------------------------------
+	// HBase lookup source tests
+	// -------------------------------------------------------------------------------------
+
+	// prepare a source collection.
+	private static final List<Row> testData = new ArrayList<>();
+	private static final RowTypeInfo testTypeInfo = new RowTypeInfo(
+		new TypeInformation[]{Types.INT, Types.LONG, Types.STRING},
+		new String[]{"a", "b", "c"});
+
+	static {
+		testData.add(Row.of(1, 1L, "Hi"));
+		testData.add(Row.of(2, 2L, "Hello"));
+		testData.add(Row.of(3, 2L, "Hello world"));
+		testData.add(Row.of(3, 3L, "Hello world!"));
 	}
 
 	// ------------------------------- Utilities -------------------------------------------------
@@ -721,4 +592,42 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 		}
 	}
 
+	private String createHBaseTableDDL(String tableName, boolean testTimeAndDecimalTypes) {
+		StringBuilder family4Statement = new StringBuilder();
+		if (testTimeAndDecimalTypes) {
+			family4Statement.append(", family4 ROW<col1 TIMESTAMP(3)");
+			family4Statement.append(", col2 DATE");
+			family4Statement.append(", col3 TIME(3)");
+			family4Statement.append(", col4 DECIMAL(12, 4)");
+			family4Statement.append("> \n");
+		}
+		if (isLegacyConnector) {
+			return "CREATE TABLE " + tableName + "(\n" +
+				"	rowkey INT,\n" +
+				"   family1 ROW<col1 INT>,\n" +
+				"   family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" +
+				"   family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>" +
+				family4Statement.toString() +
+				") WITH (\n" +
+				"   'connector.type' = 'hbase',\n" +
+				"   'connector.version' = '1.4.3',\n" +
+				"   'connector.table-name' = '" + tableName + "',\n" +
+				"   'connector.zookeeper.quorum' = '" + getZookeeperQuorum() + "',\n" +
+				"   'connector.zookeeper.znode.parent' = '/hbase' " +
+				")";
+		} else {
+			return "CREATE TABLE " + tableName + "(\n" +
+				"   rowkey INT," +
+				"   family1 ROW<col1 INT>,\n" +
+				"   family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" +
+				"   family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>" +
+				family4Statement.toString() +
+				") WITH (\n" +
+				"   'connector' = 'hbase-1.4',\n" +
+				"   'table-name' = '" + tableName + "',\n" +
+				"   'zookeeper.quorum' = '" + getZookeeperQuorum() + "',\n" +
+				"   'zookeeper.znode.parent' = '/hbase' " +
+				")";
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
index 061114e..47775ac 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.connector.hbase.options.HBaseOptions;
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
 import org.apache.flink.connector.hbase.sink.HBaseDynamicTableSink;
 import org.apache.flink.connector.hbase.source.HBaseDynamicTableSource;
-import org.apache.flink.connector.hbase.source.HBaseLookupFunction;
+import org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTableImpl;
@@ -111,8 +111,8 @@ public class HBaseDynamicTableFactoryTest {
 		assertTrue(lookupProvider instanceof TableFunctionProvider);
 
 		TableFunction tableFunction = ((TableFunctionProvider) lookupProvider).createTableFunction();
-		assertTrue(tableFunction instanceof HBaseLookupFunction);
-		assertEquals("testHBastTable", ((HBaseLookupFunction) tableFunction).getHTableName());
+		assertTrue(tableFunction instanceof HBaseRowDataLookupFunction);
+		assertEquals("testHBastTable", ((HBaseRowDataLookupFunction) tableFunction).getHTableName());
 
 		HBaseTableSchema hbaseSchema = hbaseSource.getHBaseTableSchema();
 		assertEquals(2, hbaseSchema.getRowKeyIndex());
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
index e00b986..c376b95 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
@@ -28,9 +28,17 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.flink.table.runtime.functions.SqlDateTimeUtils.dateToInternal;
+import static org.apache.flink.table.runtime.functions.SqlDateTimeUtils.timeToInternal;
+import static org.apache.flink.table.runtime.functions.SqlDateTimeUtils.timestampToInternal;
+
 /**
  * Abstract IT case class for HBase.
  */
@@ -40,7 +48,8 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
 	protected static final String TEST_TABLE_2 = "testTable2";
 	protected static final String TEST_TABLE_3 = "testTable3";
 
-	protected static final String ROWKEY = "rk";
+	protected static final String ROW_KEY = "rowkey";
+
 	protected static final String FAMILY1 = "family1";
 	protected static final String F1COL1 = "col1";
 
@@ -54,11 +63,16 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
 	protected static final String F3COL3 = "col3";
 
 	protected static final String FAMILY4 = "family4";
+	protected static final String F4COL1 = "col1";
+	protected static final String F4COL2 = "col2";
+	protected static final String F4COL3 = "col3";
+	protected static final String F4COL4 = "col4";
 
 	private static final byte[][] FAMILIES = new byte[][]{
 		Bytes.toBytes(FAMILY1),
 		Bytes.toBytes(FAMILY2),
-		Bytes.toBytes(FAMILY3)
+		Bytes.toBytes(FAMILY3),
+		Bytes.toBytes(FAMILY4)
 	};
 
 	private static final byte[][] SPLIT_KEYS = new byte[][]{Bytes.toBytes(4)};
@@ -107,14 +121,30 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
 		HTable table = openTable(tableName);
 		List<Put> puts = new ArrayList<>();
 		// add some data
-		puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
-		puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
-		puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
-		puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
-		puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
-		puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
-		puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
-		puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
+		puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1",
+			Timestamp.valueOf("2019-08-18 19:00:00"), Date.valueOf("2019-08-18"),
+			Time.valueOf("19:00:00"), new BigDecimal(12345678.0001)));
+		puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2",
+			Timestamp.valueOf("2019-08-18 19:01:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:01:00"),
+			new BigDecimal(12345678.0002)));
+		puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3",
+			Timestamp.valueOf("2019-08-18 19:02:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:02:00"),
+			new BigDecimal(12345678.0003)));
+		puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4",
+			Timestamp.valueOf("2019-08-18 19:03:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:03:00"),
+			new BigDecimal(12345678.0004)));
+		puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5",
+			Timestamp.valueOf("2019-08-19 19:10:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:10:00"),
+			new BigDecimal(12345678.0005)));
+		puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6",
+			Timestamp.valueOf("2019-08-19 19:20:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:20:00"),
+			new BigDecimal(12345678.0006)));
+		puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7",
+			Timestamp.valueOf("2019-08-19 19:30:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:30:00"),
+			new BigDecimal(12345678.0007)));
+		puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8",
+			Timestamp.valueOf("2019-08-19 19:40:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:40:00"),
+			new BigDecimal(12345678.0008)));
 
 		// append rows to table
 		table.put(puts);
@@ -139,7 +169,18 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
 		createTable(tableName, families, SPLIT_KEYS);
 	}
 
-	private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) {
+	private static Put putRow(
+			int rowKey,
+			int f1c1,
+			String f2c1,
+			long f2c2,
+			double f3c1,
+			boolean f3c2,
+			String f3c3,
+			Timestamp f4c1,
+			Date f4c2,
+			Time f4c3,
+			BigDecimal f4c4) {
 		Put put = new Put(Bytes.toBytes(rowKey));
 		// family 1
 		put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), Bytes.toBytes(f1c1));
@@ -153,6 +194,11 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
 		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), Bytes.toBytes(f3c2));
 		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), Bytes.toBytes(f3c3));
 
+		// family 4
+		put.addColumn(Bytes.toBytes(FAMILY4), Bytes.toBytes(F4COL1), Bytes.toBytes(timestampToInternal(f4c1)));
+		put.addColumn(Bytes.toBytes(FAMILY4), Bytes.toBytes(F4COL2), Bytes.toBytes(dateToInternal(f4c2)));
+		put.addColumn(Bytes.toBytes(FAMILY4), Bytes.toBytes(F4COL3), Bytes.toBytes(timeToInternal(f4c3)));
+		put.addColumn(Bytes.toBytes(FAMILY4), Bytes.toBytes(F4COL4), Bytes.toBytes(f4c4));
 		return put;
 	}
 }