You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/25 06:36:48 UTC

[flink] branch release-1.9 updated: [FLINK-13210][hive] Hive connector test should dependent on blink planner instead of legacy planner

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new f3c7450  [FLINK-13210][hive] Hive connector test should dependent on blink planner instead of legacy planner
f3c7450 is described below

commit f3c7450f33b0925c8d2434a5f9bdb8e7fa62fb55
Author: Rui Li <li...@apache.org>
AuthorDate: Fri Jul 19 16:13:28 2019 +0800

    [FLINK-13210][hive] Hive connector test should dependent on blink planner instead of legacy planner
    
    This closes #9181
---
 flink-connectors/flink-connector-hive/pom.xml      |  15 +++
 .../batch/connectors/hive/HiveTableSource.java     |   8 ++
 .../connectors/hive/HiveTableFactoryTest.java      |  14 +--
 .../batch/connectors/hive/HiveTableSinkTest.java   | 125 +++++++++++++++------
 .../batch/connectors/hive/HiveTableSourceTest.java |  42 ++-----
 .../flink/table/catalog/hive/HiveTestUtils.java    |  11 ++
 flink-table/flink-table-planner-blink/pom.xml      |  12 ++
 7 files changed, 149 insertions(+), 78 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 95f2449..6ce3dc2 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -354,11 +354,26 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
 		<!--flink-java and flink-clients test dependencies used for HiveInputFormatTest-->
 
 		<dependency>
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
index 31b896f..706442d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
@@ -19,6 +19,8 @@
 package org.apache.flink.batch.connectors.hive;
 
 import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -111,6 +113,12 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
 	}
 
 	@Override
+	public TypeInformation<Row> getReturnType() {
+		TableSchema tableSchema = catalogTable.getSchema();
+		return new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
+	}
+
+	@Override
 	public List<Map<String, String>> getPartitions() {
 		if (!initAllPartitions) {
 			initAllPartitions();
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
index 237146d..8784cd8 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
@@ -61,7 +61,7 @@ public class HiveTableFactoryTest {
 	}
 
 	@Test
-	public void testCsvTable() throws Exception {
+	public void testGenericTable() throws Exception {
 		TableSchema schema = TableSchema.builder()
 			.field("name", DataTypes.STRING())
 			.field("age", DataTypes.INT())
@@ -69,17 +69,7 @@ public class HiveTableFactoryTest {
 
 		Map<String, String> properties = new HashMap<>();
 		properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
-		properties.put("connector.type", "filesystem");
-		properties.put("connector.path", "/tmp");
-		properties.put("connector.property-version", "1");
-		properties.put("update-mode", "append");
-
-		properties.put("format.type", "csv");
-		properties.put("format.property-version", "1");
-		properties.put("format.fields.0.name", "name");
-		properties.put("format.fields.0.type", "STRING");
-		properties.put("format.fields.1.name", "age");
-		properties.put("format.fields.1.type", "INT");
+		properties.put("connector", "COLLECTION");
 
 		catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), true);
 		ObjectPath path = new ObjectPath("mydb", "mytable");
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index d16f2b0..80ad25f 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -18,18 +18,24 @@
 
 package org.apache.flink.batch.connectors.hive;
 
-import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 
 import com.klarna.hiverunner.HiveShell;
@@ -44,9 +50,12 @@ import org.junit.runner.RunWith;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 
@@ -83,14 +92,14 @@ public class HiveTableSinkTest {
 		RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 0);
 		ObjectPath tablePath = new ObjectPath(dbName, tblName);
 
-		ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
-		BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
 		List<Row> toWrite = generateRecords(5);
-		tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
+		Table src = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo));
+		tableEnv.registerTable("src", src);
 
 		tableEnv.registerCatalog("hive", hiveCatalog);
 		tableEnv.sqlQuery("select * from src").insertInto("hive", "default", "dest");
-		execEnv.execute();
+		tableEnv.execute("mytest");
 
 		verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
 
@@ -104,14 +113,15 @@ public class HiveTableSinkTest {
 		RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1);
 		ObjectPath tablePath = new ObjectPath(dbName, tblName);
 
-		ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
-		BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
+
 		List<Row> toWrite = generateRecords(5);
-		tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
+		Table src = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo));
+		tableEnv.registerTable("src", src);
 
 		tableEnv.registerCatalog("hive", hiveCatalog);
 		tableEnv.sqlQuery("select * from src").insertInto("hive", "default", "dest");
-		execEnv.execute();
+		tableEnv.execute("mytest");
 
 		List<CatalogPartitionSpec> partitionSpecs = hiveCatalog.listPartitions(tablePath);
 		assertEquals(toWrite.size(), partitionSpecs.size());
@@ -150,42 +160,54 @@ public class HiveTableSinkTest {
 		row.setField(2, struct);
 		toWrite.add(row);
 
-		ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
-		BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
-		tableEnv.registerDataSet("complexSrc", execEnv.fromCollection(toWrite, rowTypeInfo));
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
+		Table src = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo));
+		tableEnv.registerTable("complexSrc", src);
 
 		tableEnv.registerCatalog("hive", hiveCatalog);
 		tableEnv.sqlQuery("select * from complexSrc").insertInto("hive", "default", "dest");
-		execEnv.execute();
+		tableEnv.execute("mytest");
 
 		List<String> result = hiveShell.executeQuery("select * from " + tblName);
 		assertEquals(1, result.size());
 		assertEquals("[1,2,3]\t{1:\"a\",2:\"b\"}\t{\"f1\":3,\"f2\":\"c\"}", result.get(0));
+
 		hiveCatalog.dropTable(tablePath, false);
+	}
+
+	@Test
+	public void testWriteNestedComplexType() throws Exception {
+		String dbName = "default";
+		String tblName = "dest";
+		ObjectPath tablePath = new ObjectPath(dbName, tblName);
 
 		// nested complex types
-		builder = new TableSchema.Builder();
+		TableSchema.Builder builder = new TableSchema.Builder();
 		// array of rows
 		builder.fields(new String[]{"a"}, new DataType[]{DataTypes.ARRAY(
 				DataTypes.ROW(DataTypes.FIELD("f1", DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING())))});
-		rowTypeInfo = createDestTable(dbName, tblName, builder.build(), 0);
-		row = new Row(rowTypeInfo.getArity());
-		array = new Object[3];
+		RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, builder.build(), 0);
+		Row row = new Row(rowTypeInfo.getArity());
+		Object[] array = new Object[3];
 		row.setField(0, array);
 		for (int i = 0; i < array.length; i++) {
-			struct = new Row(2);
+			Row struct = new Row(2);
 			struct.setField(0, 1 + i);
 			struct.setField(1, String.valueOf((char) ('a' + i)));
 			array[i] = struct;
 		}
-		toWrite.clear();
+		List<Row> toWrite = new ArrayList<>();
 		toWrite.add(row);
 
-		tableEnv.registerDataSet("nestedSrc", execEnv.fromCollection(toWrite, rowTypeInfo));
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
+
+		Table src = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo));
+		tableEnv.registerTable("nestedSrc", src);
+		tableEnv.registerCatalog("hive", hiveCatalog);
 		tableEnv.sqlQuery("select * from nestedSrc").insertInto("hive", "default", "dest");
-		execEnv.execute();
+		tableEnv.execute("mytest");
 
-		result = hiveShell.executeQuery("select * from " + tblName);
+		List<String> result = hiveShell.executeQuery("select * from " + tblName);
 		assertEquals(1, result.size());
 		assertEquals("[{\"f1\":1,\"f2\":\"a\"},{\"f1\":2,\"f2\":\"b\"},{\"f1\":3,\"f2\":\"c\"}]", result.get(0));
 		hiveCatalog.dropTable(tablePath, false);
@@ -198,10 +220,10 @@ public class HiveTableSinkTest {
 		RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1);
 		ObjectPath tablePath = new ObjectPath(dbName, tblName);
 
-		ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
-		BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
 		List<Row> toWrite = generateRecords(1);
-		tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
+		Table src = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo));
+		tableEnv.registerTable("src", src);
 
 		Map<String, String> partSpec = new HashMap<>();
 		partSpec.put("s", "a");
@@ -211,7 +233,7 @@ public class HiveTableSinkTest {
 		hiveTableSink.setStaticPartition(partSpec);
 		tableEnv.registerTableSink("destSink", hiveTableSink);
 		tableEnv.sqlQuery("select * from src").insertInto("destSink");
-		execEnv.execute();
+		tableEnv.execute("mytest");
 
 		// make sure new partition is created
 		assertEquals(toWrite.size(), hiveCatalog.listPartitions(tablePath).size());
@@ -228,29 +250,30 @@ public class HiveTableSinkTest {
 		RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 0);
 		ObjectPath tablePath = new ObjectPath(dbName, tblName);
 
-		ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
-		BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
 
 		// write some data and verify
 		List<Row> toWrite = generateRecords(5);
-		tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
+		Table src = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo));
+		tableEnv.registerTable("src", src);
 
 		CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath);
 		tableEnv.registerTableSink("destSink", new HiveTableSink(new JobConf(hiveConf), tablePath, table));
 		tableEnv.sqlQuery("select * from src").insertInto("destSink");
-		execEnv.execute();
+		tableEnv.execute("mytest");
 
 		verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
 
 		// write some data to overwrite existing data and verify
 		toWrite = generateRecords(3);
-		tableEnv.registerDataSet("src1", execEnv.fromCollection(toWrite, rowTypeInfo));
+		Table src1 = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo));
+		tableEnv.registerTable("src1", src1);
 
 		HiveTableSink sink = new HiveTableSink(new JobConf(hiveConf), tablePath, table);
 		sink.setOverwrite(true);
 		tableEnv.registerTableSink("destSink1", sink);
 		tableEnv.sqlQuery("select * from src1").insertInto("destSink1");
-		execEnv.execute();
+		tableEnv.execute("mytest");
 
 		verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
 
@@ -285,9 +308,11 @@ public class HiveTableSinkTest {
 
 	private void verifyWrittenData(List<Row> expected, List<String> results) throws Exception {
 		assertEquals(expected.size(), results.size());
+		Set<String> expectedSet = new HashSet<>();
 		for (int i = 0; i < results.size(); i++) {
-			assertEquals(expected.get(i).toString().replaceAll(",", "\t"), results.get(i));
+			expectedSet.add(expected.get(i).toString().replaceAll(",", "\t"));
 		}
+		assertEquals(expectedSet, new HashSet<>(results));
 	}
 
 	private List<Row> generateRecords(int numRecords) {
@@ -303,4 +328,36 @@ public class HiveTableSinkTest {
 		}
 		return res;
 	}
+
+	private static class CollectionTableSource extends InputFormatTableSource<Row> {
+
+		private final Collection<Row> data;
+		private final RowTypeInfo rowTypeInfo;
+
+		CollectionTableSource(Collection<Row> data, RowTypeInfo rowTypeInfo) {
+			this.data = data;
+			this.rowTypeInfo = rowTypeInfo;
+		}
+
+		@Override
+		public DataType getProducedDataType() {
+			return TypeConversions.fromLegacyInfoToDataType(rowTypeInfo);
+		}
+
+		@Override
+		public TypeInformation<Row> getReturnType() {
+			return rowTypeInfo;
+		}
+
+		@Override
+		public InputFormat<Row, ?> getInputFormat() {
+			return new CollectionInputFormat<>(data, rowTypeInfo.createSerializer(new ExecutionConfig()));
+		}
+
+		@Override
+		public TableSchema getTableSchema() {
+			return new TableSchema.Builder().fields(rowTypeInfo.getFieldNames(),
+					TypeConversions.fromLegacyInfoToDataType(rowTypeInfo.getFieldTypes())).build();
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
index e127639..1283421 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
@@ -18,18 +18,14 @@
 
 package org.apache.flink.batch.connectors.hive;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.planner.runtime.utils.TableUtil;
 import org.apache.flink.types.Row;
 
 import com.klarna.hiverunner.HiveShell;
@@ -46,6 +42,8 @@ import org.junit.runner.RunWith;
 import java.io.IOException;
 import java.util.List;
 
+import scala.collection.JavaConverters;
+
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
@@ -93,24 +91,13 @@ public class HiveTableSourceTest {
 				.addRow(4, 4, "d", 4000L, 4.44)
 				.commit();
 
-		TableSchema tableSchema = new TableSchema(
-				new String[]{"a", "b", "c", "d", "e"},
-				new TypeInformation[]{
-						BasicTypeInfo.INT_TYPE_INFO,
-						BasicTypeInfo.INT_TYPE_INFO,
-						BasicTypeInfo.STRING_TYPE_INFO,
-						BasicTypeInfo.LONG_TYPE_INFO,
-						BasicTypeInfo.DOUBLE_TYPE_INFO}
-		);
-		ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
-		BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
+		TableEnvironment tEnv = HiveTestUtils.createTableEnv();
 		ObjectPath tablePath = new ObjectPath(dbName, tblName);
 		CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
 		HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable);
 		Table src = tEnv.fromTableSource(hiveTableSource);
-		DataSet<Row> rowDataSet = tEnv.toDataSet(src, new RowTypeInfo(tableSchema.getFieldTypes(),
-																	tableSchema.getFieldNames()));
-		List<Row> rows = rowDataSet.collect();
+		List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) src)).asJava();
+
 		Assert.assertEquals(4, rows.size());
 		Assert.assertEquals(1, rows.get(0).getField(0));
 		Assert.assertEquals(2, rows.get(1).getField(0));
@@ -135,21 +122,12 @@ public class HiveTableSourceTest {
 				.addRow("2015", 2, 1)
 				.addRow("2015", 5, 1)
 				.commit();
-		TableSchema tableSchema = new TableSchema(
-				new String[]{"year", "value", "int"},
-				new TypeInformation[]{
-						BasicTypeInfo.STRING_TYPE_INFO,
-						BasicTypeInfo.INT_TYPE_INFO,
-						BasicTypeInfo.INT_TYPE_INFO}
-		);
-		ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
-		BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
+		TableEnvironment tEnv = HiveTestUtils.createTableEnv();
 		ObjectPath tablePath = new ObjectPath(dbName, tblName);
 		CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
 		HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable);
 		Table src = tEnv.fromTableSource(hiveTableSource);
-		DataSet<Row> rowDataSet = tEnv.toDataSet(src, new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()));
-		List<Row> rows = rowDataSet.collect();
+		List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) src)).asJava();
 		assertEquals(4, rows.size());
 		Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
 		assertArrayEquals(new String[]{"2014,3,0", "2014,4,0", "2015,2,1", "2015,5,1"}, rowStrings);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 82e4d8e..04220c6 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.catalog.CatalogTest;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
@@ -32,6 +34,8 @@ import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM;
+
 /**
  * Test utils for Hive connector.
  */
@@ -95,4 +99,11 @@ public class HiveTestUtils {
 		}
 		throw new RuntimeException("Exhausted all ephemeral ports and didn't find a free one");
 	}
+
+	public static TableEnvironment createTableEnv() {
+		EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
+		TableEnvironment tableEnv = TableEnvironment.create(settings);
+		tableEnv.getConfig().getConfiguration().setInteger(SQL_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
+		return tableEnv;
+	}
 }
diff --git a/flink-table/flink-table-planner-blink/pom.xml b/flink-table/flink-table-planner-blink/pom.xml
index dc56f44..4f71191 100644
--- a/flink-table/flink-table-planner-blink/pom.xml
+++ b/flink-table/flink-table-planner-blink/pom.xml
@@ -388,6 +388,18 @@ under the License.
 					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
 				</configuration>
 			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
 	</build>
 </project>