You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/25 06:28:18 UTC
[flink] branch master updated: [FLINK-13210][hive] Hive connector
test should dependent on blink planner instead of legacy planner
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d57741c [FLINK-13210][hive] Hive connector test should dependent on blink planner instead of legacy planner
d57741c is described below
commit d57741cef9d4773cc487418baa961254d0d47524
Author: Rui Li <li...@apache.org>
AuthorDate: Fri Jul 19 16:13:28 2019 +0800
[FLINK-13210][hive] Hive connector test should dependent on blink planner instead of legacy planner
This closes #9181
---
flink-connectors/flink-connector-hive/pom.xml | 15 +++
.../batch/connectors/hive/HiveTableSource.java | 8 ++
.../connectors/hive/HiveTableFactoryTest.java | 14 +--
.../batch/connectors/hive/HiveTableSinkTest.java | 125 +++++++++++++++------
.../batch/connectors/hive/HiveTableSourceTest.java | 42 ++-----
.../flink/table/catalog/hive/HiveTestUtils.java | 11 ++
flink-table/flink-table-planner-blink/pom.xml | 12 ++
7 files changed, 149 insertions(+), 78 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index b8f010c..3b4bc61 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -354,11 +354,26 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<!--flink-java and flink-clients test dependencies used for HiveInputFormatTest-->
<dependency>
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
index 31b896f..706442d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
@@ -19,6 +19,8 @@
package org.apache.flink.batch.connectors.hive;
import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
@@ -111,6 +113,12 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
}
@Override
+ public TypeInformation<Row> getReturnType() {
+ TableSchema tableSchema = catalogTable.getSchema();
+ return new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
+ }
+
+ @Override
public List<Map<String, String>> getPartitions() {
if (!initAllPartitions) {
initAllPartitions();
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
index 237146d..8784cd8 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
@@ -61,7 +61,7 @@ public class HiveTableFactoryTest {
}
@Test
- public void testCsvTable() throws Exception {
+ public void testGenericTable() throws Exception {
TableSchema schema = TableSchema.builder()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
@@ -69,17 +69,7 @@ public class HiveTableFactoryTest {
Map<String, String> properties = new HashMap<>();
properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
- properties.put("connector.type", "filesystem");
- properties.put("connector.path", "/tmp");
- properties.put("connector.property-version", "1");
- properties.put("update-mode", "append");
-
- properties.put("format.type", "csv");
- properties.put("format.property-version", "1");
- properties.put("format.fields.0.name", "name");
- properties.put("format.fields.0.type", "STRING");
- properties.put("format.fields.1.name", "age");
- properties.put("format.fields.1.type", "INT");
+ properties.put("connector", "COLLECTION");
catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), true);
ObjectPath path = new ObjectPath("mydb", "mytable");
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index d16f2b0..80ad25f 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -18,18 +18,24 @@
package org.apache.flink.batch.connectors.hive;
-import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.sources.InputFormatTableSource;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import com.klarna.hiverunner.HiveShell;
@@ -44,9 +50,12 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import static org.junit.Assert.assertEquals;
@@ -83,14 +92,14 @@ public class HiveTableSinkTest {
RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 0);
ObjectPath tablePath = new ObjectPath(dbName, tblName);
- ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
- BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
+ TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
List<Row> toWrite = generateRecords(5);
- tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
+ Table src = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo));
+ tableEnv.registerTable("src", src);
tableEnv.registerCatalog("hive", hiveCatalog);
tableEnv.sqlQuery("select * from src").insertInto("hive", "default", "dest");
- execEnv.execute();
+ tableEnv.execute("mytest");
verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
@@ -104,14 +113,15 @@ public class HiveTableSinkTest {
RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1);
ObjectPath tablePath = new ObjectPath(dbName, tblName);
- ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
- BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
+ TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
+
List<Row> toWrite = generateRecords(5);
- tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
+ Table src = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo));
+ tableEnv.registerTable("src", src);
tableEnv.registerCatalog("hive", hiveCatalog);
tableEnv.sqlQuery("select * from src").insertInto("hive", "default", "dest");
- execEnv.execute();
+ tableEnv.execute("mytest");
List<CatalogPartitionSpec> partitionSpecs = hiveCatalog.listPartitions(tablePath);
assertEquals(toWrite.size(), partitionSpecs.size());
@@ -150,42 +160,54 @@ public class HiveTableSinkTest {
row.setField(2, struct);
toWrite.add(row);
- ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
- BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
- tableEnv.registerDataSet("complexSrc", execEnv.fromCollection(toWrite, rowTypeInfo));
+ TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
+ Table src = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo));
+ tableEnv.registerTable("complexSrc", src);
tableEnv.registerCatalog("hive", hiveCatalog);
tableEnv.sqlQuery("select * from complexSrc").insertInto("hive", "default", "dest");
- execEnv.execute();
+ tableEnv.execute("mytest");
List<String> result = hiveShell.executeQuery("select * from " + tblName);
assertEquals(1, result.size());
assertEquals("[1,2,3]\t{1:\"a\",2:\"b\"}\t{\"f1\":3,\"f2\":\"c\"}", result.get(0));
+
hiveCatalog.dropTable(tablePath, false);
+ }
+
+ @Test
+ public void testWriteNestedComplexType() throws Exception {
+ String dbName = "default";
+ String tblName = "dest";
+ ObjectPath tablePath = new ObjectPath(dbName, tblName);
// nested complex types
- builder = new TableSchema.Builder();
+ TableSchema.Builder builder = new TableSchema.Builder();
// array of rows
builder.fields(new String[]{"a"}, new DataType[]{DataTypes.ARRAY(
DataTypes.ROW(DataTypes.FIELD("f1", DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING())))});
- rowTypeInfo = createDestTable(dbName, tblName, builder.build(), 0);
- row = new Row(rowTypeInfo.getArity());
- array = new Object[3];
+ RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, builder.build(), 0);
+ Row row = new Row(rowTypeInfo.getArity());
+ Object[] array = new Object[3];
row.setField(0, array);
for (int i = 0; i < array.length; i++) {
- struct = new Row(2);
+ Row struct = new Row(2);
struct.setField(0, 1 + i);
struct.setField(1, String.valueOf((char) ('a' + i)));
array[i] = struct;
}
- toWrite.clear();
+ List<Row> toWrite = new ArrayList<>();
toWrite.add(row);
- tableEnv.registerDataSet("nestedSrc", execEnv.fromCollection(toWrite, rowTypeInfo));
+ TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
+
+ Table src = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo));
+ tableEnv.registerTable("nestedSrc", src);
+ tableEnv.registerCatalog("hive", hiveCatalog);
tableEnv.sqlQuery("select * from nestedSrc").insertInto("hive", "default", "dest");
- execEnv.execute();
+ tableEnv.execute("mytest");
- result = hiveShell.executeQuery("select * from " + tblName);
+ List<String> result = hiveShell.executeQuery("select * from " + tblName);
assertEquals(1, result.size());
assertEquals("[{\"f1\":1,\"f2\":\"a\"},{\"f1\":2,\"f2\":\"b\"},{\"f1\":3,\"f2\":\"c\"}]", result.get(0));
hiveCatalog.dropTable(tablePath, false);
@@ -198,10 +220,10 @@ public class HiveTableSinkTest {
RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1);
ObjectPath tablePath = new ObjectPath(dbName, tblName);
- ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
- BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
+ TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
List<Row> toWrite = generateRecords(1);
- tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
+ Table src = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo));
+ tableEnv.registerTable("src", src);
Map<String, String> partSpec = new HashMap<>();
partSpec.put("s", "a");
@@ -211,7 +233,7 @@ public class HiveTableSinkTest {
hiveTableSink.setStaticPartition(partSpec);
tableEnv.registerTableSink("destSink", hiveTableSink);
tableEnv.sqlQuery("select * from src").insertInto("destSink");
- execEnv.execute();
+ tableEnv.execute("mytest");
// make sure new partition is created
assertEquals(toWrite.size(), hiveCatalog.listPartitions(tablePath).size());
@@ -228,29 +250,30 @@ public class HiveTableSinkTest {
RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 0);
ObjectPath tablePath = new ObjectPath(dbName, tblName);
- ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
- BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
+ TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
// write some data and verify
List<Row> toWrite = generateRecords(5);
- tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
+ Table src = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo));
+ tableEnv.registerTable("src", src);
CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath);
tableEnv.registerTableSink("destSink", new HiveTableSink(new JobConf(hiveConf), tablePath, table));
tableEnv.sqlQuery("select * from src").insertInto("destSink");
- execEnv.execute();
+ tableEnv.execute("mytest");
verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
// write some data to overwrite existing data and verify
toWrite = generateRecords(3);
- tableEnv.registerDataSet("src1", execEnv.fromCollection(toWrite, rowTypeInfo));
+ Table src1 = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo));
+ tableEnv.registerTable("src1", src1);
HiveTableSink sink = new HiveTableSink(new JobConf(hiveConf), tablePath, table);
sink.setOverwrite(true);
tableEnv.registerTableSink("destSink1", sink);
tableEnv.sqlQuery("select * from src1").insertInto("destSink1");
- execEnv.execute();
+ tableEnv.execute("mytest");
verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
@@ -285,9 +308,11 @@ public class HiveTableSinkTest {
private void verifyWrittenData(List<Row> expected, List<String> results) throws Exception {
assertEquals(expected.size(), results.size());
+ Set<String> expectedSet = new HashSet<>();
for (int i = 0; i < results.size(); i++) {
- assertEquals(expected.get(i).toString().replaceAll(",", "\t"), results.get(i));
+ expectedSet.add(expected.get(i).toString().replaceAll(",", "\t"));
}
+ assertEquals(expectedSet, new HashSet<>(results));
}
private List<Row> generateRecords(int numRecords) {
@@ -303,4 +328,36 @@ public class HiveTableSinkTest {
}
return res;
}
+
+ private static class CollectionTableSource extends InputFormatTableSource<Row> {
+
+ private final Collection<Row> data;
+ private final RowTypeInfo rowTypeInfo;
+
+ CollectionTableSource(Collection<Row> data, RowTypeInfo rowTypeInfo) {
+ this.data = data;
+ this.rowTypeInfo = rowTypeInfo;
+ }
+
+ @Override
+ public DataType getProducedDataType() {
+ return TypeConversions.fromLegacyInfoToDataType(rowTypeInfo);
+ }
+
+ @Override
+ public TypeInformation<Row> getReturnType() {
+ return rowTypeInfo;
+ }
+
+ @Override
+ public InputFormat<Row, ?> getInputFormat() {
+ return new CollectionInputFormat<>(data, rowTypeInfo.createSerializer(new ExecutionConfig()));
+ }
+
+ @Override
+ public TableSchema getTableSchema() {
+ return new TableSchema.Builder().fields(rowTypeInfo.getFieldNames(),
+ TypeConversions.fromLegacyInfoToDataType(rowTypeInfo.getFieldTypes())).build();
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
index e127639..1283421 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
@@ -18,18 +18,14 @@
package org.apache.flink.batch.connectors.hive;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.planner.runtime.utils.TableUtil;
import org.apache.flink.types.Row;
import com.klarna.hiverunner.HiveShell;
@@ -46,6 +42,8 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.List;
+import scala.collection.JavaConverters;
+
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -93,24 +91,13 @@ public class HiveTableSourceTest {
.addRow(4, 4, "d", 4000L, 4.44)
.commit();
- TableSchema tableSchema = new TableSchema(
- new String[]{"a", "b", "c", "d", "e"},
- new TypeInformation[]{
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO}
- );
- ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
- BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
+ TableEnvironment tEnv = HiveTestUtils.createTableEnv();
ObjectPath tablePath = new ObjectPath(dbName, tblName);
CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable);
Table src = tEnv.fromTableSource(hiveTableSource);
- DataSet<Row> rowDataSet = tEnv.toDataSet(src, new RowTypeInfo(tableSchema.getFieldTypes(),
- tableSchema.getFieldNames()));
- List<Row> rows = rowDataSet.collect();
+ List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) src)).asJava();
+
Assert.assertEquals(4, rows.size());
Assert.assertEquals(1, rows.get(0).getField(0));
Assert.assertEquals(2, rows.get(1).getField(0));
@@ -135,21 +122,12 @@ public class HiveTableSourceTest {
.addRow("2015", 2, 1)
.addRow("2015", 5, 1)
.commit();
- TableSchema tableSchema = new TableSchema(
- new String[]{"year", "value", "int"},
- new TypeInformation[]{
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO}
- );
- ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
- BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
+ TableEnvironment tEnv = HiveTestUtils.createTableEnv();
ObjectPath tablePath = new ObjectPath(dbName, tblName);
CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable);
Table src = tEnv.fromTableSource(hiveTableSource);
- DataSet<Row> rowDataSet = tEnv.toDataSet(src, new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()));
- List<Row> rows = rowDataSet.collect();
+ List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) src)).asJava();
assertEquals(4, rows.size());
Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
assertArrayEquals(new String[]{"2014,3,0", "2014,4,0", "2015,2,1", "2015,5,1"}, rowStrings);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 82e4d8e..04220c6 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.catalog.hive;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogTest;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
@@ -32,6 +34,8 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM;
+
/**
* Test utils for Hive connector.
*/
@@ -95,4 +99,11 @@ public class HiveTestUtils {
}
throw new RuntimeException("Exhausted all ephemeral ports and didn't find a free one");
}
+
+ public static TableEnvironment createTableEnv() {
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
+ TableEnvironment tableEnv = TableEnvironment.create(settings);
+ tableEnv.getConfig().getConfiguration().setInteger(SQL_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
+ return tableEnv;
+ }
}
diff --git a/flink-table/flink-table-planner-blink/pom.xml b/flink-table/flink-table-planner-blink/pom.xml
index cafd4e6..04e5f71 100644
--- a/flink-table/flink-table-planner-blink/pom.xml
+++ b/flink-table/flink-table-planner-blink/pom.xml
@@ -388,6 +388,18 @@ under the License.
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
</configuration>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>