You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/02 02:09:07 UTC

[flink] 03/03: [FLINK-13290][hbase] Enable blink planner for integration tests of HBase

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5d981ebc16ce6a1089e021fcb7a634ebe0167be5
Author: Jark Wu <im...@gmail.com>
AuthorDate: Tue Jul 30 18:17:39 2019 +0800

    [FLINK-13290][hbase] Enable blink planner for integration tests of HBase
    
    This commit combines HBaseTableSourceITCase and HBaseLookupFunctionITCase and HBaseConnectorITCase into one class.
    This can save much cluster initialization time for us.
    
    This closes #9275
---
 flink-connectors/flink-hbase/pom.xml               |  27 +-
 .../flink/addons/hbase/HBaseTableSource.java       |   5 +-
 .../flink/addons/hbase/HBaseConnectorITCase.java   | 601 +++++++++++++--------
 .../addons/hbase/HBaseLookupFunctionITCase.java    | 203 -------
 .../apache/flink/addons/hbase/HBaseSinkITCase.java | 186 -------
 .../flink/addons/hbase/util/HBaseTestBase.java     | 150 +++++
 .../HBaseTestingClusterAutoStarter.java}           |  11 +-
 .../flink/addons/hbase/util/PlannerType.java       |  27 +
 8 files changed, 571 insertions(+), 639 deletions(-)

diff --git a/flink-connectors/flink-hbase/pom.xml b/flink-connectors/flink-hbase/pom.xml
index a9d3c22..eba6028 100644
--- a/flink-connectors/flink-hbase/pom.xml
+++ b/flink-connectors/flink-hbase/pom.xml
@@ -99,14 +99,6 @@ under the License.
 			<scope>provided</scope>
 			<optional>true</optional>
 		</dependency>
-		<!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-			<optional>true</optional>
-		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
@@ -299,6 +291,25 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
index 3fff5fa..b1e7161 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
@@ -180,7 +180,10 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
 
 	@Override
 	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
-		throw new UnsupportedOperationException("HBase table can not convert to DataStream currently.");
+		HBaseTableSchema projectedSchema = hbaseSchema.getProjectedHBaseTableSchema(projectFields);
+		return execEnv
+			.createInput(new HBaseRowInputFormat(conf, tableName, projectedSchema), getReturnType())
+			.name(explainSource());
 	}
 
 	@VisibleForTesting
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
index bd992c5..6a0909e 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -1,6 +1,4 @@
 /*
- * Copyright The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,212 +18,138 @@
 
 package org.apache.flink.addons.hbase;
 
+import org.apache.flink.addons.hbase.util.HBaseTestBase;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.internal.TableImpl;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.runtime.utils.TableUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-
+import java.util.Map;
+
+import static org.apache.flink.addons.hbase.util.PlannerType.OLD_PLANNER;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 import static org.junit.Assert.assertEquals;
 
 /**
- * This class contains integrations tests for multiple HBase connectors:
- * - TableInputFormat
- * - HBaseTableSource
- *
- * <p>These tests are located in a single test file to avoided unnecessary initializations of the
- * HBaseTestingCluster which takes about half a minute.
- *
+ * IT cases for HBase connector (including HBaseTableSource and HBaseTableSink).
  */
-public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
-
-	private static final String TEST_TABLE = "testTable";
-
-	private static final String FAMILY1 = "family1";
-	private static final String F1COL1 = "col1";
-
-	private static final String FAMILY2 = "family2";
-	private static final String F2COL1 = "col1";
-	private static final String F2COL2 = "col2";
-
-	private static final String FAMILY3 = "family3";
-	private static final String F3COL1 = "col1";
-	private static final String F3COL2 = "col2";
-	private static final String F3COL3 = "col3";
-
-	@BeforeClass
-	public static void activateHBaseCluster() throws IOException {
-		registerHBaseMiniClusterInClasspath();
-		prepareTable();
-		LimitNetworkBuffersTestEnvironment.setAsContext();
-	}
-
-	@AfterClass
-	public static void resetExecutionEnvironmentFactory() {
-		LimitNetworkBuffersTestEnvironment.unsetAsContext();
-	}
-
-	private static void prepareTable() throws IOException {
-
-		// create a table
-		TableName tableName = TableName.valueOf(TEST_TABLE);
-		// column families
-		byte[][] families = new byte[][]{
-			Bytes.toBytes(FAMILY1),
-			Bytes.toBytes(FAMILY2),
-			Bytes.toBytes(FAMILY3)
-		};
-		// split keys
-		byte[][] splitKeys = new byte[][]{ Bytes.toBytes(4) };
-		createTable(tableName, families, splitKeys);
-
-		// get the HTable instance
-		HTable table = openTable(tableName);
-		List<Put> puts = new ArrayList<>();
-		// add some data
-		puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
-		puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
-		puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
-		puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
-		puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
-		puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
-		puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
-		puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
-
-		// append rows to table
-		table.put(puts);
-		table.close();
-	}
-
-	private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) {
-		Put put = new Put(Bytes.toBytes(rowKey));
-		// family 1
-		put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), Bytes.toBytes(f1c1));
-		// family 2
-		if (f2c1 != null) {
-			put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1));
-		}
-		put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), Bytes.toBytes(f2c2));
-		// family 3
-		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), Bytes.toBytes(f3c1));
-		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), Bytes.toBytes(f3c2));
-		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), Bytes.toBytes(f3c3));
-
-		return put;
-	}
+public class HBaseConnectorITCase extends HBaseTestBase {
 
-	// ######## HBaseTableSource tests ############
+	// -------------------------------------------------------------------------------------
+	// HBaseTableSource tests
+	// -------------------------------------------------------------------------------------
 
 	@Test
 	public void testTableSourceFullScan() throws Exception {
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(4);
-		BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig());
-		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+		TableEnvironment tEnv = createBatchTableEnv();
+		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1);
 		hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
 		hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
 		hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
 		hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
 		hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
 		hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-		tableEnv.registerTableSource("hTable", hbaseTable);
-
-		Table result = tableEnv.sqlQuery(
-			"SELECT " +
-				"  h.family1.col1, " +
-				"  h.family2.col1, " +
-				"  h.family2.col2, " +
-				"  h.family3.col1, " +
-				"  h.family3.col2, " +
-				"  h.family3.col3 " +
-				"FROM hTable AS h"
-		);
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-
+		tEnv.registerTableSource("hTable", hbaseTable);
+
+		Table table = tEnv.sqlQuery("SELECT " +
+			"  h.family1.col1, " +
+			"  h.family2.col1, " +
+			"  h.family2.col2, " +
+			"  h.family3.col1, " +
+			"  h.family3.col2, " +
+			"  h.family3.col3 " +
+			"FROM hTable AS h");
+
+		List<Row> results = collectBatchResult(table);
 		String expected =
 			"10,Hello-1,100,1.01,false,Welt-1\n" +
-			"20,Hello-2,200,2.02,true,Welt-2\n" +
-			"30,Hello-3,300,3.03,false,Welt-3\n" +
-			"40,null,400,4.04,true,Welt-4\n" +
-			"50,Hello-5,500,5.05,false,Welt-5\n" +
-			"60,Hello-6,600,6.06,true,Welt-6\n" +
-			"70,Hello-7,700,7.07,false,Welt-7\n" +
-			"80,null,800,8.08,true,Welt-8\n";
+				"20,Hello-2,200,2.02,true,Welt-2\n" +
+				"30,Hello-3,300,3.03,false,Welt-3\n" +
+				"40,null,400,4.04,true,Welt-4\n" +
+				"50,Hello-5,500,5.05,false,Welt-5\n" +
+				"60,Hello-6,600,6.06,true,Welt-6\n" +
+				"70,Hello-7,700,7.07,false,Welt-7\n" +
+				"80,null,800,8.08,true,Welt-8\n";
 
 		TestBaseUtils.compareResultAsText(results, expected);
 	}
 
 	@Test
 	public void testTableSourceProjection() throws Exception {
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(4);
-		BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig());
-		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+		TableEnvironment tEnv = createBatchTableEnv();
+		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1);
 		hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
 		hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
 		hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
 		hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
 		hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
 		hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-		tableEnv.registerTableSource("hTable", hbaseTable);
+		tEnv.registerTableSource("hTable", hbaseTable);
 
-		Table result = tableEnv.sqlQuery(
-			"SELECT " +
-				"  h.family1.col1, " +
-				"  h.family3.col1, " +
-				"  h.family3.col2, " +
-				"  h.family3.col3 " +
-				"FROM hTable AS h"
-		);
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
+		Table table = tEnv.sqlQuery("SELECT " +
+			"  h.family1.col1, " +
+			"  h.family3.col1, " +
+			"  h.family3.col2, " +
+			"  h.family3.col3 " +
+			"FROM hTable AS h");
 
+		List<Row> results = collectBatchResult(table);
 		String expected =
 			"10,1.01,false,Welt-1\n" +
-			"20,2.02,true,Welt-2\n" +
-			"30,3.03,false,Welt-3\n" +
-			"40,4.04,true,Welt-4\n" +
-			"50,5.05,false,Welt-5\n" +
-			"60,6.06,true,Welt-6\n" +
-			"70,7.07,false,Welt-7\n" +
-			"80,8.08,true,Welt-8\n";
+				"20,2.02,true,Welt-2\n" +
+				"30,3.03,false,Welt-3\n" +
+				"40,4.04,true,Welt-4\n" +
+				"50,5.05,false,Welt-5\n" +
+				"60,6.06,true,Welt-6\n" +
+				"70,7.07,false,Welt-7\n" +
+				"80,8.08,true,Welt-8\n";
 
 		TestBaseUtils.compareResultAsText(results, expected);
 	}
 
 	@Test
 	public void testTableSourceFieldOrder() throws Exception {
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(4);
-		BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig());
-		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+		TableEnvironment tEnv = createBatchTableEnv();
+		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1);
 		// shuffle order of column registration
 		hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
 		hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
@@ -233,68 +157,319 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 		hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
 		hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
 		hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-		tableEnv.registerTableSource("hTable", hbaseTable);
+		tEnv.registerTableSource("hTable", hbaseTable);
 
-		Table result = tableEnv.sqlQuery(
-			"SELECT * FROM hTable AS h"
-		);
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
+		Table table = tEnv.sqlQuery("SELECT * FROM hTable AS h");
 
+		List<Row> results = collectBatchResult(table);
 		String expected =
 			"Hello-1,100,1.01,false,Welt-1,10\n" +
-			"Hello-2,200,2.02,true,Welt-2,20\n" +
-			"Hello-3,300,3.03,false,Welt-3,30\n" +
-			"null,400,4.04,true,Welt-4,40\n" +
-			"Hello-5,500,5.05,false,Welt-5,50\n" +
-			"Hello-6,600,6.06,true,Welt-6,60\n" +
-			"Hello-7,700,7.07,false,Welt-7,70\n" +
-			"null,800,8.08,true,Welt-8,80\n";
+				"Hello-2,200,2.02,true,Welt-2,20\n" +
+				"Hello-3,300,3.03,false,Welt-3,30\n" +
+				"null,400,4.04,true,Welt-4,40\n" +
+				"Hello-5,500,5.05,false,Welt-5,50\n" +
+				"Hello-6,600,6.06,true,Welt-6,60\n" +
+				"Hello-7,700,7.07,false,Welt-7,70\n" +
+				"null,800,8.08,true,Welt-8,80\n";
 
 		TestBaseUtils.compareResultAsText(results, expected);
 	}
 
 	@Test
 	public void testTableSourceReadAsByteArray() throws Exception {
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(4);
-		BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig());
+		TableEnvironment tEnv = createBatchTableEnv();
 		// fetch row2 from the table till the end
-		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1);
 		hbaseTable.addColumn(FAMILY2, F2COL1, byte[].class);
 		hbaseTable.addColumn(FAMILY2, F2COL2, byte[].class);
+		tEnv.registerTableSource("hTable", hbaseTable);
+		tEnv.registerFunction("toUTF8", new ToUTF8());
+		tEnv.registerFunction("toLong", new ToLong());
 
-		tableEnv.registerTableSource("hTable", hbaseTable);
-		tableEnv.registerFunction("toUTF8", new ToUTF8());
-		tableEnv.registerFunction("toLong", new ToLong());
-
-		Table result = tableEnv.sqlQuery(
+		Table table = tEnv.sqlQuery(
 			"SELECT " +
 				"  toUTF8(h.family2.col1), " +
 				"  toLong(h.family2.col2) " +
 				"FROM hTable AS h"
 		);
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
 
+		List<Row> results = collectBatchResult(table);
 		String expected =
 			"Hello-1,100\n" +
-			"Hello-2,200\n" +
-			"Hello-3,300\n" +
-			"null,400\n" +
-			"Hello-5,500\n" +
-			"Hello-6,600\n" +
-			"Hello-7,700\n" +
-			"null,800\n";
+				"Hello-2,200\n" +
+				"Hello-3,300\n" +
+				"null,400\n" +
+				"Hello-5,500\n" +
+				"Hello-6,600\n" +
+				"Hello-7,700\n" +
+				"null,800\n";
 
 		TestBaseUtils.compareResultAsText(results, expected);
 	}
 
+	@Test
+	public void testTableInputFormat() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple1<Integer>> result = env
+			.createInput(new InputFormatForTestTable())
+			.reduce((ReduceFunction<Tuple1<Integer>>) (v1, v2) -> Tuple1.of(v1.f0 + v2.f0));
+
+		List<Tuple1<Integer>> resultSet = result.collect();
+
+		assertEquals(1, resultSet.size());
+		assertEquals(360, (int) resultSet.get(0).f0);
+	}
+
+	// -------------------------------------------------------------------------------------
+	// HBaseTableSink tests
+	// -------------------------------------------------------------------------------------
+
+	// prepare a source collection.
+	private static final List<Row> testData1 = new ArrayList<>();
+	private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(
+		new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE, Types.BOOLEAN, Types.STRING},
+		new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3"});
+
+	static {
+		testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
+		testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
+		testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
+		testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4"));
+		testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
+		testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
+		testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
+		testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8"));
+	}
+
+	@Test
+	public void testTableSink() throws Exception {
+		HBaseTableSchema schema = new HBaseTableSchema();
+		schema.addColumn(FAMILY1, F1COL1, Integer.class);
+		schema.addColumn(FAMILY2, F2COL1, String.class);
+		schema.addColumn(FAMILY2, F2COL2, Long.class);
+		schema.setRowKey("rk", Integer.class);
+		schema.addColumn(FAMILY3, F3COL1, Double.class);
+		schema.addColumn(FAMILY3, F3COL2, Boolean.class);
+		schema.addColumn(FAMILY3, F3COL3, String.class);
+
+		Map<String, String> tableProperties = new HashMap<>();
+		tableProperties.put("connector.type", "hbase");
+		tableProperties.put("connector.version", "1.4.3");
+		tableProperties.put("connector.property-version", "1");
+		tableProperties.put("connector.table-name", TEST_TABLE_2);
+		tableProperties.put("connector.zookeeper.quorum", getZookeeperQuorum());
+		tableProperties.put("connector.zookeeper.znode.parent", "/hbase");
+		DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+		descriptorProperties.putTableSchema(SCHEMA, schema.convertsToTableSchema());
+		descriptorProperties.putProperties(tableProperties);
+		TableSink tableSink = TableFactoryService
+			.find(HBaseTableFactory.class, descriptorProperties.asMap())
+			.createTableSink(descriptorProperties.asMap());
+
+		StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+
+		DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1);
+		tEnv.registerDataStream("src", ds);
+		tEnv.registerTableSink("hbase", tableSink);
+
+		String query = "INSERT INTO hbase SELECT ROW(f1c1), ROW(f2c1, f2c2), rowkey, ROW(f3c1, f3c2, f3c3) FROM src";
+		tEnv.sqlUpdate(query);
+
+		// wait to finish
+		tEnv.execute("HBase Job");
+
+		// start a batch scan job to verify contents in HBase table
+		// start a batch scan job to verify contents in HBase table
+		TableEnvironment batchTableEnv = createBatchTableEnv();
+
+		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_2);
+		hbaseTable.setRowKey("rowkey", Integer.class);
+		hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
+		hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
+		hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
+		hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
+		hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
+		hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
+		batchTableEnv.registerTableSource("hTable", hbaseTable);
+
+		Table table = batchTableEnv.sqlQuery(
+			"SELECT " +
+				"  h.rowkey, " +
+				"  h.family1.col1, " +
+				"  h.family2.col1, " +
+				"  h.family2.col2, " +
+				"  h.family3.col1, " +
+				"  h.family3.col2, " +
+				"  h.family3.col3 " +
+				"FROM hTable AS h"
+		);
+
+		List<Row> results = collectBatchResult(table);
+		String expected =
+			"1,10,Hello-1,100,1.01,false,Welt-1\n" +
+				"2,20,Hello-2,200,2.02,true,Welt-2\n" +
+				"3,30,Hello-3,300,3.03,false,Welt-3\n" +
+				"4,40,,400,4.04,true,Welt-4\n" +
+				"5,50,Hello-5,500,5.05,false,Welt-5\n" +
+				"6,60,Hello-6,600,6.06,true,Welt-6\n" +
+				"7,70,Hello-7,700,7.07,false,Welt-7\n" +
+				"8,80,,800,8.08,true,Welt-8\n";
+
+		TestBaseUtils.compareResultAsText(results, expected);
+	}
+
+
+	// -------------------------------------------------------------------------------------
+	// HBase lookup source tests
+	// -------------------------------------------------------------------------------------
+
+	// prepare a source collection.
+	private static final List<Row> testData2 = new ArrayList<>();
+	private static final RowTypeInfo testTypeInfo2 = new RowTypeInfo(
+		new TypeInformation[]{Types.INT, Types.LONG, Types.STRING},
+		new String[]{"a", "b", "c"});
+
+	static {
+		testData2.add(Row.of(1, 1L, "Hi"));
+		testData2.add(Row.of(2, 2L, "Hello"));
+		testData2.add(Row.of(3, 2L, "Hello world"));
+		testData2.add(Row.of(3, 3L, "Hello world!"));
+	}
+
+	@Test
+	public void testHBaseLookupFunction() throws Exception {
+		StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, streamSettings);
+		StreamITCase.clear();
+
+		// prepare a source table
+		DataStream<Row> ds = streamEnv.fromCollection(testData2).returns(testTypeInfo2);
+		Table in = streamTableEnv.fromDataStream(ds, "a, b, c");
+		streamTableEnv.registerTable("src", in);
+
+		Map<String, String> tableProperties = hbaseTableProperties();
+		TableSource source = TableFactoryService
+			.find(HBaseTableFactory.class, tableProperties)
+			.createTableSource(tableProperties);
+
+		streamTableEnv.registerFunction("hbaseLookup", ((HBaseTableSource) source).getLookupFunction(new String[]{ROWKEY}));
+
+		// perform a temporal table join query
+		String sqlQuery = "SELECT a,family1.col1, family3.col3 FROM src, LATERAL TABLE(hbaseLookup(a))";
+		Table result = streamTableEnv.sqlQuery(sqlQuery);
+
+		DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink<>());
+
+		streamEnv.execute();
+
+		List<String> expected = new ArrayList<>();
+		expected.add("1,10,Welt-1");
+		expected.add("2,20,Welt-2");
+		expected.add("3,30,Welt-3");
+		expected.add("3,30,Welt-3");
+
+		StreamITCase.compareWithList(expected);
+	}
+
+	@Test
+	public void testHBaseLookupTableSource() throws Exception {
+		if (OLD_PLANNER.equals(planner)) {
+			// lookup table source is only supported in blink planner, skip for old planner
+			return;
+		}
+		StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, streamSettings);
+		StreamITCase.clear();
+
+		// prepare a source table
+		String srcTableName = "src";
+		DataStream<Row> ds = streamEnv.fromCollection(testData2).returns(testTypeInfo2);
+		Table in = streamTableEnv.fromDataStream(ds, "a, b, c, proc.proctime");
+		streamTableEnv.registerTable(srcTableName, in);
+
+		Map<String, String> tableProperties = hbaseTableProperties();
+		TableSource source = TableFactoryService
+			.find(HBaseTableFactory.class, tableProperties)
+			.createTableSource(tableProperties);
+		streamTableEnv.registerTableSource("hbaseLookup", source);
+		// perform a temporal table join query
+		String query = "SELECT a,family1.col1, family3.col3 FROM src " +
+			"JOIN hbaseLookup FOR SYSTEM_TIME AS OF src.proc as h ON src.a = h.rk";
+		Table result = streamTableEnv.sqlQuery(query);
+
+		DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink<>());
+
+		streamEnv.execute();
+
+		List<String> expected = new ArrayList<>();
+		expected.add("1,10,Welt-1");
+		expected.add("2,20,Welt-2");
+		expected.add("3,30,Welt-3");
+		expected.add("3,30,Welt-3");
+
+		StreamITCase.compareWithList(expected);
+	}
+
+	private static Map<String, String> hbaseTableProperties() {
+		Map<String, String> properties = new HashMap<>();
+		properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE);
+		properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
+		properties.put(CONNECTOR_PROPERTY_VERSION, "1");
+		properties.put(CONNECTOR_TABLE_NAME, TEST_TABLE_1);
+		// get zk quorum from "hbase-site.xml" in classpath
+		String hbaseZk = HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM);
+		properties.put(CONNECTOR_ZK_QUORUM, hbaseZk);
+		// schema
+		String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
+		TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT);
+		TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, F2COL2}, Types.STRING, Types.LONG);
+		TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING);
+		TypeInformation[] columnTypes = new TypeInformation[]{f1, Types.INT, f2, f3};
+
+		DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+		TableSchema tableSchema = new TableSchema(columnNames, columnTypes);
+		descriptorProperties.putTableSchema(SCHEMA, tableSchema);
+		descriptorProperties.putProperties(properties);
+		return descriptorProperties.asMap();
+	}
+
+	// ------------------------------- Utilities -------------------------------------------------
+
+	/**
+	 * Creates a Batch {@link TableEnvironment} depends on the {@link #planner} context.
+	 */
+	private TableEnvironment createBatchTableEnv() {
+		if (OLD_PLANNER.equals(planner)) {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			return BatchTableEnvironment.create(env, new TableConfig());
+		} else {
+			return TableEnvironment.create(batchSettings);
+		}
+	}
+
+	/**
+	 * Collects batch result depends on the {@link #planner} context.
+	 */
+	private List<Row> collectBatchResult(Table table) throws Exception {
+		TableImpl tableImpl = (TableImpl) table;
+		if (OLD_PLANNER.equals(planner)) {
+			BatchTableEnvironment batchTableEnv = (BatchTableEnvironment) tableImpl.getTableEnvironment();
+			DataSet<Row> resultSet = batchTableEnv.toDataSet(table, Row.class);
+			return resultSet.collect();
+		} else {
+			return JavaScalaConversionUtil.toJava(TableUtil.collect(tableImpl));
+		}
+	}
+
 	/**
 	 * A {@link ScalarFunction} that maps byte arrays to UTF-8 strings.
 	 */
 	public static class ToUTF8 extends ScalarFunction {
+		private static final long serialVersionUID = 1L;
 
 		public String eval(byte[] bytes) {
 			return Bytes.toString(bytes);
@@ -305,15 +480,18 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 	 * A {@link ScalarFunction} that maps byte array to longs.
 	 */
 	public static class ToLong extends ScalarFunction {
+		private static final long serialVersionUID = 1L;
 
 		public long eval(byte[] bytes) {
 			return Bytes.toLong(bytes);
 		}
 	}
 
-	// ######## TableInputFormat tests ############
-
-	class InputFormatForTestTable extends TableInputFormat<Tuple1<Integer>> {
+	/**
+	 * A {@link TableInputFormat} for testing.
+	 */
+	public static class InputFormatForTestTable extends TableInputFormat<Tuple1<Integer>> {
+		private static final long serialVersionUID = 1L;
 
 		@Override
 		protected Scan getScanner() {
@@ -322,7 +500,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 
 		@Override
 		protected String getTableName() {
-			return TEST_TABLE;
+			return TEST_TABLE_1;
 		}
 
 		@Override
@@ -331,51 +509,4 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 		}
 	}
 
-	@Test
-	public void testTableInputFormat() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(4);
-
-		DataSet<Tuple1<Integer>> result = env
-			.createInput(new InputFormatForTestTable())
-			.reduce(new ReduceFunction<Tuple1<Integer>>(){
-
-				@Override
-				public Tuple1<Integer> reduce(Tuple1<Integer> v1, Tuple1<Integer> v2) throws Exception {
-					return Tuple1.of(v1.f0 + v2.f0);
-				}
-			});
-
-		List<Tuple1<Integer>> resultSet = result.collect();
-
-		assertEquals(1, resultSet.size());
-		assertEquals(360, (int) resultSet.get(0).f0);
-	}
-
-	/**
-	 * Allows the tests to use {@link ExecutionEnvironment#getExecutionEnvironment()} but with a
-	 * configuration that limits the maximum memory used for network buffers since the current
-	 * defaults are too high for Travis-CI.
-	 */
-	private abstract static class LimitNetworkBuffersTestEnvironment extends ExecutionEnvironment {
-
-		public static void setAsContext() {
-			Configuration config = new Configuration();
-			// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
-			config.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
-			final LocalEnvironment le = new LocalEnvironment(config);
-
-			initializeContextEnvironment(new ExecutionEnvironmentFactory() {
-				@Override
-				public ExecutionEnvironment createExecutionEnvironment() {
-					return le;
-				}
-			});
-		}
-
-		public static void unsetAsContext() {
-			resetContextEnvironment();
-		}
-	}
-
 }
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
deleted file mode 100644
index 731cf31..0000000
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.runtime.utils.StreamITCase;
-import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.types.Row;
-
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-
-/**
- * IT case Test HBaseLookupFunction.
- */
-public class HBaseLookupFunctionITCase extends HBaseTestingClusterAutostarter {
-	private static final String ROWKEY = "rk";
-	private static final String FAMILY1 = "family1";
-	private static final String F1COL1 = "col1";
-
-	private static final String FAMILY2 = "family2";
-	private static final String F2COL1 = "col1";
-	private static final String F2COL2 = "col2";
-
-	private static final String FAMILY3 = "family3";
-	private static final String F3COL1 = "col1";
-	private static final String F3COL2 = "col2";
-	private static final String F3COL3 = "col3";
-
-	private static final String HTABLE_NAME = "testSrcHBaseTable1";
-
-	// prepare a source collection.
-	private static final List<Row> testData1 = new ArrayList<>();
-
-	static {
-		testData1.add(Row.of(1, 1L, "Hi"));
-		testData1.add(Row.of(2, 2L, "Hello"));
-		testData1.add(Row.of(3, 2L, "Hello world"));
-		testData1.add(Row.of(3, 3L, "Hello world!"));
-	}
-
-	private static final TypeInformation<?>[] testTypes1 = {BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};
-	private static final String[] testColumns1 = {"a", "b", "c"};
-	private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(testTypes1, testColumns1);
-
-	@BeforeClass
-	public static void activateHBaseCluster() throws IOException {
-		registerHBaseMiniClusterInClasspath();
-		prepareHBaseTableWithData();
-	}
-
-	private static void prepareHBaseTableWithData() throws IOException {
-		// create a table
-		TableName tableName = TableName.valueOf(HTABLE_NAME);
-		// column families
-		byte[][] families = new byte[][]{Bytes.toBytes(FAMILY1), Bytes.toBytes(FAMILY2), Bytes.toBytes(FAMILY3)};
-		// split keys
-		byte[][] splitKeys = new byte[][]{Bytes.toBytes(4)};
-		createTable(tableName, families, splitKeys);
-
-		// get the HTable instance
-		HTable table = openTable(tableName);
-		List<Put> puts = new ArrayList<>();
-		// add some data
-		puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
-		puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
-		puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
-		puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
-		puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
-		puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
-		puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
-		puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
-
-		// append rows to table
-		table.put(puts);
-		table.close();
-	}
-
-	private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) {
-		Put put = new Put(Bytes.toBytes(rowKey));
-		// family 1
-		put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), Bytes.toBytes(f1c1));
-		// family 2
-		if (f2c1 != null) {
-			put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1));
-		}
-		put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), Bytes.toBytes(f2c2));
-		// family 3
-		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), Bytes.toBytes(f3c1));
-		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), Bytes.toBytes(f3c2));
-		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), Bytes.toBytes(f3c3));
-
-		return put;
-	}
-
-	private static Map<String, String> hbaseTableProperties() {
-		Map<String, String> properties = new HashMap<>();
-		properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE);
-		properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
-		properties.put(CONNECTOR_PROPERTY_VERSION, "1");
-		properties.put(CONNECTOR_TABLE_NAME, HTABLE_NAME);
-		// get zk quorum from "hbase-site.xml" in classpath
-		String hbaseZk = HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM);
-		properties.put(CONNECTOR_ZK_QUORUM, hbaseZk);
-		// schema
-		String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
-		TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT);
-		TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, F2COL2}, Types.STRING, Types.LONG);
-		TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING);
-		TypeInformation[] columnTypes = new TypeInformation[]{f1, Types.INT, f2, f3};
-
-		DescriptorProperties descriptorProperties = new DescriptorProperties(true);
-		TableSchema tableSchema = new TableSchema(columnNames, columnTypes);
-		descriptorProperties.putTableSchema(SCHEMA, tableSchema);
-		descriptorProperties.putProperties(properties);
-		return descriptorProperties.asMap();
-	}
-
-	@Test
-	public void testHBaseLookupFunction() throws Exception {
-		StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-		streamEnv.setParallelism(4);
-		StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv);
-		StreamITCase.clear();
-
-		// prepare a source table
-		String srcTableName = "testStreamSrcTable1";
-		DataStream<Row> ds = streamEnv.fromCollection(testData1).returns(testTypeInfo1);
-		Table in = streamTableEnv.fromDataStream(ds, String.join(",", testColumns1));
-		streamTableEnv.registerTable(srcTableName, in);
-
-		Map<String, String> tableProperties = hbaseTableProperties();
-		TableSource source = TableFactoryService
-			.find(HBaseTableFactory.class, tableProperties)
-			.createTableSource(tableProperties);
-
-		streamTableEnv.registerFunction("hbaseLookup", ((HBaseTableSource) source).getLookupFunction(new String[]{ROWKEY}));
-
-		// perform a temporal table join query
-		String sqlQuery = "SELECT a,family1.col1, family3.col3 FROM testStreamSrcTable1, LATERAL TABLE(hbaseLookup(a))";
-		Table result = streamTableEnv.sqlQuery(sqlQuery);
-
-		DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink<>());
-
-		streamEnv.execute();
-
-		List<String> expected = new ArrayList<>();
-		expected.add("1,10,Welt-1");
-		expected.add("2,20,Welt-2");
-		expected.add("3,30,Welt-3");
-		expected.add("3,30,Welt-3");
-
-		StreamITCase.compareWithList(expected);
-	}
-}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java
deleted file mode 100644
index ef1445d..0000000
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.runtime.utils.StreamITCase;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.types.Row;
-
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-
-/**
- * IT case Test for {@link HBaseUpsertTableSink}.
- */
-public class HBaseSinkITCase extends HBaseTestingClusterAutostarter {
-	private static final long serialVersionUID = 1L;
-
-	private static final String TEST_TABLE = "testTable";
-
-	private static final String FAMILY1 = "family1";
-	private static final String F1COL1 = "col1";
-
-	private static final String FAMILY2 = "family2";
-	private static final String F2COL1 = "col1";
-	private static final String F2COL2 = "col2";
-
-	private static final String FAMILY3 = "family3";
-	private static final String F3COL1 = "col1";
-	private static final String F3COL2 = "col2";
-	private static final String F3COL3 = "col3";
-
-	// prepare a source collection.
-	private static final List<Row> testData1 = new ArrayList<>();
-	private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(
-		new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE, Types.BOOLEAN, Types.STRING},
-		new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3"});
-
-	static {
-		testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
-		testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
-		testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
-		testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4"));
-		testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
-		testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
-		testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
-		testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8"));
-	}
-
-	@BeforeClass
-	public static void activateHBaseCluster() throws IOException {
-		registerHBaseMiniClusterInClasspath();
-		createHBaseTable();
-	}
-
-	private static void createHBaseTable() {
-		// create a table
-		TableName tableName = TableName.valueOf(TEST_TABLE);
-		// column families
-		byte[][] families = new byte[][]{Bytes.toBytes(FAMILY1), Bytes.toBytes(FAMILY2), Bytes.toBytes(FAMILY3)};
-		// split keys
-		byte[][] splitKeys = new byte[][]{Bytes.toBytes(4)};
-		createTable(tableName, families, splitKeys);
-	}
-
-	@Test
-	public void testTableSink() throws Exception {
-		HBaseTableSchema schema = new HBaseTableSchema();
-		schema.addColumn(FAMILY1, F1COL1, Integer.class);
-		schema.addColumn(FAMILY2, F2COL1, String.class);
-		schema.addColumn(FAMILY2, F2COL2, Long.class);
-		schema.setRowKey("rk", Integer.class);
-		schema.addColumn(FAMILY3, F3COL1, Double.class);
-		schema.addColumn(FAMILY3, F3COL2, Boolean.class);
-		schema.addColumn(FAMILY3, F3COL3, String.class);
-
-		Map<String, String> tableProperties = new HashMap<>();
-		tableProperties.put("connector.type", "hbase");
-		tableProperties.put("connector.version", "1.4.3");
-		tableProperties.put("connector.property-version", "1");
-		tableProperties.put("connector.table-name", TEST_TABLE);
-		tableProperties.put("connector.zookeeper.quorum", getZookeeperQuorum());
-		tableProperties.put("connector.zookeeper.znode.parent", "/hbase");
-		DescriptorProperties descriptorProperties = new DescriptorProperties(true);
-		descriptorProperties.putTableSchema(SCHEMA, schema.convertsToTableSchema());
-		descriptorProperties.putProperties(tableProperties);
-		TableSink tableSink = TableFactoryService
-			.find(HBaseTableFactory.class, descriptorProperties.asMap())
-			.createTableSink(descriptorProperties.asMap());
-
-		StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-		execEnv.setParallelism(4);
-		StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv);
-		StreamITCase.clear();
-
-		DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1);
-		tEnv.registerDataStream("src", ds);
-		tEnv.registerTableSink("hbase", tableSink);
-
-		String query = "INSERT INTO hbase SELECT ROW(f1c1), ROW(f2c1, f2c2), rowkey, ROW(f3c1, f3c2, f3c3) FROM src";
-		tEnv.sqlUpdate(query);
-
-		// wait to finish
-		tEnv.execute("HBase Job");
-
-		// start a batch scan job to verify contents in HBase table
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(4);
-		BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig());
-
-		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
-		hbaseTable.setRowKey("rowkey", Integer.class);
-		hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
-		hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
-		hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
-		hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
-		hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
-		hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-		tableEnv.registerTableSource("hTable", hbaseTable);
-
-		Table result = tableEnv.sqlQuery(
-			"SELECT " +
-				"  h.rowkey, " +
-				"  h.family1.col1, " +
-				"  h.family2.col1, " +
-				"  h.family2.col2, " +
-				"  h.family3.col1, " +
-				"  h.family3.col2, " +
-				"  h.family3.col3 " +
-				"FROM hTable AS h"
-		);
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-
-		String expected =
-				"1,10,Hello-1,100,1.01,false,Welt-1\n" +
-				"2,20,Hello-2,200,2.02,true,Welt-2\n" +
-				"3,30,Hello-3,300,3.03,false,Welt-3\n" +
-				"4,40,,400,4.04,true,Welt-4\n" +
-				"5,50,Hello-5,500,5.05,false,Welt-5\n" +
-				"6,60,Hello-6,600,6.06,true,Welt-6\n" +
-				"7,70,Hello-7,700,7.07,false,Welt-7\n" +
-				"8,80,,800,8.08,true,Welt-8\n";
-
-		TestBaseUtils.compareResultAsText(results, expected);
-	}
-}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java
new file mode 100644
index 0000000..32baf0c
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.util;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.addons.hbase.util.PlannerType.BLINK_PLANNER;
+import static org.apache.flink.addons.hbase.util.PlannerType.OLD_PLANNER;
+
+/**
+ * Abstract IT case class for HBase.
+ */
+@RunWith(Parameterized.class)
+public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
+
+	protected static final String TEST_TABLE_1 = "testTable1";
+	protected static final String TEST_TABLE_2 = "testTable2";
+
+	protected static final String ROWKEY = "rk";
+	protected static final String FAMILY1 = "family1";
+	protected static final String F1COL1 = "col1";
+
+	protected static final String FAMILY2 = "family2";
+	protected static final String F2COL1 = "col1";
+	protected static final String F2COL2 = "col2";
+
+	protected static final String FAMILY3 = "family3";
+	protected static final String F3COL1 = "col1";
+	protected static final String F3COL2 = "col2";
+	protected static final String F3COL3 = "col3";
+
+	private static final byte[][] FAMILIES = new byte[][]{
+		Bytes.toBytes(FAMILY1),
+		Bytes.toBytes(FAMILY2),
+		Bytes.toBytes(FAMILY3)
+	};
+
+	private static final byte[][] SPLIT_KEYS = new byte[][]{Bytes.toBytes(4)};
+
+	@Parameterized.Parameter
+	public PlannerType planner;
+	protected EnvironmentSettings streamSettings;
+	protected EnvironmentSettings batchSettings;
+
+	@Parameterized.Parameters(name = "planner = {0}")
+	public static PlannerType[] getPlanner() {
+		return new PlannerType[]{BLINK_PLANNER, OLD_PLANNER};
+	}
+
+	@BeforeClass
+	public static void activateHBaseCluster() throws IOException {
+		registerHBaseMiniClusterInClasspath();
+		prepareTables();
+	}
+
+	@Before
+	public void before() {
+		EnvironmentSettings.Builder streamBuilder = EnvironmentSettings.newInstance().inStreamingMode();
+		EnvironmentSettings.Builder batchBuilder = EnvironmentSettings.newInstance().inBatchMode();
+		if (BLINK_PLANNER.equals(planner)) {
+			this.streamSettings = streamBuilder.useBlinkPlanner().build();
+			this.batchSettings = batchBuilder.useBlinkPlanner().build();
+		} else if (OLD_PLANNER.equals(planner)) {
+			this.streamSettings = streamBuilder.useOldPlanner().build();
+			this.batchSettings = batchBuilder.useOldPlanner().build();
+		} else {
+			throw new IllegalArgumentException("Unsupported planner name " + planner);
+		}
+	}
+
+	private static void prepareTables() throws IOException {
+		createHBaseTable1();
+		createHBaseTable2();
+	}
+
+	private static void createHBaseTable1() throws IOException {
+		// create a table
+		TableName tableName = TableName.valueOf(TEST_TABLE_1);
+		createTable(tableName, FAMILIES, SPLIT_KEYS);
+
+		// get the HTable instance
+		HTable table = openTable(tableName);
+		List<Put> puts = new ArrayList<>();
+		// add some data
+		puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
+		puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
+		puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
+		puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
+		puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
+		puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
+		puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
+		puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
+
+		// append rows to table
+		table.put(puts);
+		table.close();
+	}
+
+	private static void createHBaseTable2() {
+		// create a table
+		TableName tableName = TableName.valueOf(TEST_TABLE_2);
+		createTable(tableName, FAMILIES, SPLIT_KEYS);
+	}
+
+	private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) {
+		Put put = new Put(Bytes.toBytes(rowKey));
+		// family 1
+		put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), Bytes.toBytes(f1c1));
+		// family 2
+		if (f2c1 != null) {
+			put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1));
+		}
+		put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), Bytes.toBytes(f2c2));
+		// family 3
+		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), Bytes.toBytes(f3c1));
+		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), Bytes.toBytes(f3c2));
+		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), Bytes.toBytes(f3c3));
+
+		return put;
+	}
+}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestingClusterAutoStarter.java
similarity index 97%
rename from flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
rename to flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestingClusterAutoStarter.java
index 905f80d..b515c2b 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestingClusterAutoStarter.java
@@ -18,9 +18,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.addons.hbase;
+package org.apache.flink.addons.hbase.util;
 
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,7 +42,6 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.MalformedURLException;
@@ -74,9 +73,9 @@ import static org.junit.Assert.fail;
 //
 // https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java
 //
-public class HBaseTestingClusterAutostarter extends TestLogger implements Serializable {
+public abstract class HBaseTestingClusterAutoStarter extends AbstractTestBase {
 
-	private static final Log LOG = LogFactory.getLog(HBaseTestingClusterAutostarter.class);
+	private static final Log LOG = LogFactory.getLog(HBaseTestingClusterAutoStarter.class);
 
 	private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 	private static HBaseAdmin admin = null;
@@ -160,7 +159,7 @@ public class HBaseTestingClusterAutostarter extends TestLogger implements Serial
 	/**
 	 * Returns zookeeper quorum value contains the right port number (varies per run).
 	 */
-	static String getZookeeperQuorum() {
+	protected static String getZookeeperQuorum() {
 		return "localhost:" + TEST_UTIL.getZkCluster().getClientPort();
 	}
 
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/PlannerType.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/PlannerType.java
new file mode 100644
index 0000000..cd1bf39
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/PlannerType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.util;
+
+/**
+ * Planner type to use.
+ */
+public enum PlannerType {
+	BLINK_PLANNER,
+	OLD_PLANNER
+}