You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/26 06:30:45 UTC

[flink] branch release-1.11 updated (643f4a1 -> 2f92386)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 643f4a1  [FLINK-17889][hive] Hive can not work with filesystem connector
     new da1ccb9  [FLINK-17896][hive] HiveCatalog can work with new table factory because of is_generic
     new 52b7e87  [FLINK-17867][hive][test] Add hdfs dependency to hive-3.1.1 test
     new d836cd3  [FLINK-17456][hive][test] Update hive connector tests to execute DDL & DML via TableEnvironment
     new 2f92386  [hotfix] Remove generic row for HiveTableFactory

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-connectors/flink-connector-hive/pom.xml      |   8 +
 .../connectors/hive/HiveDynamicTableFactory.java   |  88 ++++++++
 .../flink/connectors/hive/HiveTableFactory.java    |   5 +-
 .../flink/table/catalog/hive/HiveCatalog.java      |   7 +
 .../flink/connectors/hive/HiveLookupJoinTest.java  |   2 +-
 .../flink/connectors/hive/HiveTableSinkTest.java   |  12 +-
 .../flink/connectors/hive/HiveTableSourceTest.java | 132 ++++++------
 .../connectors/hive/TableEnvHiveConnectorTest.java | 232 +++++++++++----------
 .../table/catalog/hive/HiveCatalogITCase.java      |  44 +++-
 .../catalog/hive/HiveCatalogUseBlinkITCase.java    |  29 ++-
 .../flink/table/catalog/hive/HiveTestUtils.java    |  12 ++
 11 files changed, 355 insertions(+), 216 deletions(-)
 create mode 100644 flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java


[flink] 01/04: [FLINK-17896][hive] HiveCatalog can work with new table factory because of is_generic

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit da1ccb9cfae8be0abda7e85d15db3d38304336d2
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon May 25 14:22:50 2020 +0800

    [FLINK-17896][hive] HiveCatalog can work with new table factory because of is_generic
    
    This closes #12316
---
 .../connectors/hive/HiveDynamicTableFactory.java   | 88 ++++++++++++++++++++++
 .../flink/table/catalog/hive/HiveCatalog.java      |  7 ++
 .../table/catalog/hive/HiveCatalogITCase.java      | 44 ++++++++++-
 3 files changed, 135 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
new file mode 100644
index 0000000..56086fd
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.table.catalog.config.CatalogConfig.IS_GENERIC;
+
+/**
+ * A dynamic table factory implementation for Hive catalog. Now it only support generic tables.
+ * Hive tables should be resolved by {@link HiveTableFactory}.
+ */
+public class HiveDynamicTableFactory implements
+		DynamicTableSourceFactory,
+		DynamicTableSinkFactory {
+
+	@Override
+	public String factoryIdentifier() {
+		throw new UnsupportedOperationException("Hive factory is only work for catalog.");
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		throw new UnsupportedOperationException("Hive factory is only work for catalog.");
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		throw new UnsupportedOperationException("Hive factory is only work for catalog.");
+	}
+
+	private static CatalogTable removeIsGenericFlag(CatalogTable table) {
+		Map<String, String> newOptions = new HashMap<>(table.getOptions());
+		boolean isGeneric = Boolean.parseBoolean(newOptions.remove(IS_GENERIC));
+		if (!isGeneric) {
+			throw new ValidationException(
+					"Hive dynamic table factory now only work for generic table.");
+		}
+		return table.copy(newOptions);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		return FactoryUtil.createTableSink(
+				null, // we already in the factory of catalog
+				context.getObjectIdentifier(),
+				removeIsGenericFlag(context.getCatalogTable()),
+				context.getConfiguration(),
+				context.getClassLoader());
+	}
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		return FactoryUtil.createTableSource(
+				null, // we already in the factory of catalog
+				context.getObjectIdentifier(),
+				removeIsGenericFlag(context.getCatalogTable()),
+				context.getConfiguration(),
+				context.getClassLoader());
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 5bc469f..d10127f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
 import org.apache.flink.connectors.hive.HiveTableFactory;
 import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils;
 import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp;
@@ -69,6 +70,7 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.util.Preconditions;
@@ -238,6 +240,11 @@ public class HiveCatalog extends AbstractCatalog {
 	}
 
 	@Override
+	public Optional<Factory> getFactory() {
+		return Optional.of(new HiveDynamicTableFactory());
+	}
+
+	@Override
 	public Optional<TableFactory> getTableFactory() {
 		return Optional.of(new HiveTableFactory(hiveConf));
 	}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
index 4064057..2817361 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableBuilder;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -52,8 +51,10 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileReader;
+import java.io.PrintStream;
 import java.net.URI;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -63,6 +64,7 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -237,7 +239,7 @@ public class HiveCatalogITCase {
 		// similar to CatalogTableITCase::testReadWriteCsvUsingDDL but uses HiveCatalog
 		EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
 		TableEnvironment tableEnv = TableEnvironment.create(settings);
-		tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+		tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
 
 		tableEnv.registerCatalog("myhive", hiveCatalog);
 		tableEnv.useCatalog("myhive");
@@ -282,8 +284,7 @@ public class HiveCatalogITCase {
 		}
 		EnvironmentSettings settings = builder.build();
 		TableEnvironment tableEnv = TableEnvironment.create(settings);
-		tableEnv.getConfig().getConfiguration().setInteger(
-				ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+		tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
 
 		tableEnv.registerCatalog("myhive", hiveCatalog);
 		tableEnv.useCatalog("myhive");
@@ -307,4 +308,39 @@ public class HiveCatalogITCase {
 		Assert.assertEquals(5, rows.size());
 		tableEnv.executeSql("DROP TABLE proctime_src");
 	}
+
+	@Test
+	public void testNewTableFactory() {
+		TableEnvironment tEnv = TableEnvironment.create(
+				EnvironmentSettings.newInstance().inBatchMode().build());
+		tEnv.registerCatalog("myhive", hiveCatalog);
+		tEnv.useCatalog("myhive");
+		tEnv.getConfig().getConfiguration().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+		String path = this.getClass().getResource("/csv/test.csv").getPath();
+
+		PrintStream originalSystemOut = System.out;
+		try {
+			ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
+			System.setOut(new PrintStream(arrayOutputStream));
+
+			tEnv.executeSql("create table csv_table (name String, age Int) with (" +
+					"'connector.type' = 'filesystem'," +
+					"'connector.path' = 'file://" + path + "'," +
+					"'format.type' = 'csv')");
+			tEnv.executeSql("create table print_table (name String, age Int) with ('connector' = 'print')");
+
+			TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into print_table select * from csv_table");
+
+			// assert query result
+			assertEquals("+I(1,1)\n+I(2,2)\n+I(3,3)\n", arrayOutputStream.toString());
+		} finally {
+			if (System.out != originalSystemOut) {
+				System.out.close();
+			}
+			System.setOut(originalSystemOut);
+			tEnv.executeSql("DROP TABLE csv_table");
+			tEnv.executeSql("DROP TABLE print_table");
+		}
+	}
 }


[flink] 03/04: [FLINK-17456][hive][test] Update hive connector tests to execute DDL & DML via TableEnvironment

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d836cd336f2a0df64d18065ddcb5d1141ed02a0e
Author: Rui Li <li...@apache.org>
AuthorDate: Thu May 21 20:50:31 2020 +0800

    [FLINK-17456][hive][test] Update hive connector tests to execute DDL & DML via TableEnvironment
    
    This closes #12281
---
 .../flink/connectors/hive/HiveLookupJoinTest.java  |   2 +-
 .../flink/connectors/hive/HiveTableSinkTest.java   |  12 +-
 .../flink/connectors/hive/HiveTableSourceTest.java | 132 ++++++------
 .../connectors/hive/TableEnvHiveConnectorTest.java | 232 +++++++++++----------
 .../catalog/hive/HiveCatalogUseBlinkITCase.java    |  29 ++-
 .../flink/table/catalog/hive/HiveTestUtils.java    |  12 ++
 6 files changed, 210 insertions(+), 209 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinTest.java
index ba17ada..88fae01 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinTest.java
@@ -109,7 +109,7 @@ public class HiveLookupJoinTest {
 			List<Row> results = Lists.newArrayList(flinkTable.execute().collect());
 			assertEquals("[1,a, 2,b, 3,c]", results.toString());
 		} finally {
-			hiveShell.execute("drop table build");
+			tableEnv.executeSql("drop table build");
 		}
 	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
index d268246..2cc03de 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
@@ -202,19 +202,19 @@ public class HiveTableSinkTest {
 
 	@Test
 	public void testWriteNullValues() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+		tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+		tableEnv.useCatalog(hiveCatalog.getName());
+		tableEnv.executeSql("create database db1");
 		try {
 			// 17 data types
-			hiveShell.execute("create table db1.src" +
+			tableEnv.executeSql("create table db1.src" +
 					"(t tinyint,s smallint,i int,b bigint,f float,d double,de decimal(10,5),ts timestamp,dt date," +
 					"str string,ch char(5),vch varchar(8),bl boolean,bin binary,arr array<int>,mp map<int,string>,strt struct<f1:int,f2:string>)");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src")
 					.addRow(new Object[]{null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null})
 					.commit();
 			hiveShell.execute("create table db1.dest like db1.src");
-			TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-			tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
-			tableEnv.useCatalog(hiveCatalog.getName());
 
 			TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest select * from db1.src");
 			List<String> results = hiveShell.executeQuery("select * from db1.dest");
@@ -224,7 +224,7 @@ public class HiveTableSinkTest {
 			assertEquals("NULL", cols[0]);
 			assertEquals(1, new HashSet<>(Arrays.asList(cols)).size());
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
index 6c1c483..f36a60a 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
@@ -128,10 +128,10 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 
 	@Test
 	public void testReadNonPartitionedTable() throws Exception {
-		final String catalogName = "hive";
 		final String dbName = "source_db";
 		final String tblName = "test";
-		hiveShell.execute("CREATE TABLE source_db.test ( a INT, b INT, c STRING, d BIGINT, e DOUBLE)");
+		TableEnvironment tEnv = createTableEnv();
+		tEnv.executeSql("CREATE TABLE source_db.test ( a INT, b INT, c STRING, d BIGINT, e DOUBLE)");
 		HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
 				.addRow(new Object[] { 1, 1, "a", 1000L, 1.11 })
 				.addRow(new Object[] { 2, 2, "b", 2000L, 2.22 })
@@ -139,8 +139,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 				.addRow(new Object[] { 4, 4, "d", 4000L, 4.44 })
 				.commit();
 
-		TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-		tEnv.registerCatalog(catalogName, hiveCatalog);
 		Table src = tEnv.sqlQuery("select * from hive.source_db.test");
 		List<Row> rows = Lists.newArrayList(src.execute().collect());
 
@@ -153,10 +151,10 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 
 	@Test
 	public void testReadComplexDataType() throws Exception {
-		final String catalogName = "hive";
 		final String dbName = "source_db";
 		final String tblName = "complex_test";
-		hiveShell.execute("create table source_db.complex_test(" +
+		TableEnvironment tEnv = createTableEnv();
+		tEnv.executeSql("create table source_db.complex_test(" +
 						"a array<int>, m map<int,string>, s struct<f1:int,f2:bigint>)");
 		Integer[] array = new Integer[]{1, 2, 3};
 		Map<Integer, String> map = new LinkedHashMap<>();
@@ -166,8 +164,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 		HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
 				.addRow(new Object[]{array, map, struct})
 				.commit();
-		TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-		tEnv.registerCatalog(catalogName, hiveCatalog);
 		Table src = tEnv.sqlQuery("select * from hive.source_db.complex_test");
 		List<Row> rows = Lists.newArrayList(src.execute().collect());
 		Assert.assertEquals(1, rows.size());
@@ -182,11 +178,11 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 	 */
 	@Test
 	public void testReadPartitionTable() throws Exception {
-		final String catalogName = "hive";
 		final String dbName = "source_db";
 		final String tblName = "test_table_pt";
-		hiveShell.execute("CREATE TABLE source_db.test_table_pt " +
-						"(year STRING, value INT) partitioned by (pt int);");
+		TableEnvironment tEnv = createTableEnv();
+		tEnv.executeSql("CREATE TABLE source_db.test_table_pt " +
+						"(`year` STRING, `value` INT) partitioned by (pt int)");
 		HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
 				.addRow(new Object[]{"2014", 3})
 				.addRow(new Object[]{"2014", 4})
@@ -195,8 +191,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 				.addRow(new Object[]{"2015", 2})
 				.addRow(new Object[]{"2015", 5})
 				.commit("pt=1");
-		TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-		tEnv.registerCatalog(catalogName, hiveCatalog);
 		Table src = tEnv.sqlQuery("select * from hive.source_db.test_table_pt");
 		List<Row> rows = Lists.newArrayList(src.execute().collect());
 
@@ -207,11 +201,11 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 
 	@Test
 	public void testPartitionPrunning() throws Exception {
-		final String catalogName = "hive";
 		final String dbName = "source_db";
 		final String tblName = "test_table_pt_1";
-		hiveShell.execute("CREATE TABLE source_db.test_table_pt_1 " +
-						"(year STRING, value INT) partitioned by (pt int);");
+		TableEnvironment tEnv = createTableEnv();
+		tEnv.executeSql("CREATE TABLE source_db.test_table_pt_1 " +
+						"(`year` STRING, `value` INT) partitioned by (pt int)");
 		HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
 				.addRow(new Object[]{"2014", 3})
 				.addRow(new Object[]{"2014", 4})
@@ -220,8 +214,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 				.addRow(new Object[]{"2015", 2})
 				.addRow(new Object[]{"2015", 5})
 				.commit("pt=1");
-		TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-		tEnv.registerCatalog(catalogName, hiveCatalog);
 		Table src = tEnv.sqlQuery("select * from hive.source_db.test_table_pt_1 where pt = 0");
 		// first check execution plan to ensure partition prunning works
 		String[] explain = src.explain().split("==.*==\n");
@@ -241,9 +233,14 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 
 	@Test
 	public void testPartitionFilter() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+		TestPartitionFilterCatalog catalog = new TestPartitionFilterCatalog(
+				hiveCatalog.getName(), hiveCatalog.getDefaultDatabase(), hiveCatalog.getHiveConf(), hiveCatalog.getHiveVersion());
+		tableEnv.registerCatalog(catalog.getName(), catalog);
+		tableEnv.useCatalog(catalog.getName());
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.part(x int) partitioned by (p1 int,p2 string)");
+			tableEnv.executeSql("create table db1.part(x int) partitioned by (p1 int,p2 string)");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part")
 					.addRow(new Object[]{1}).commit("p1=1,p2='a'");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part")
@@ -253,11 +250,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 			// test string partition columns with special characters
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part")
 					.addRow(new Object[]{4}).commit("p1=4,p2='c:2'");
-			TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-			TestPartitionFilterCatalog catalog = new TestPartitionFilterCatalog(
-					hiveCatalog.getName(), hiveCatalog.getDefaultDatabase(), hiveCatalog.getHiveConf(), hiveCatalog.getHiveVersion());
-			tableEnv.registerCatalog(catalog.getName(), catalog);
-			tableEnv.useCatalog(catalog.getName());
 			Table query = tableEnv.sqlQuery("select x from db1.part where p1>1 or p2<>'a' order by x");
 			String[] explain = query.explain().split("==.*==\n");
 			assertFalse(catalog.fallback);
@@ -298,15 +290,20 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 			results = Lists.newArrayList(query.execute().collect());
 			assertEquals("[4]", results.toString());
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
 	@Test
 	public void testPartitionFilterDateTimestamp() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+		TestPartitionFilterCatalog catalog = new TestPartitionFilterCatalog(
+				hiveCatalog.getName(), hiveCatalog.getDefaultDatabase(), hiveCatalog.getHiveConf(), hiveCatalog.getHiveVersion());
+		tableEnv.registerCatalog(catalog.getName(), catalog);
+		tableEnv.useCatalog(catalog.getName());
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.part(x int) partitioned by (p1 date,p2 timestamp)");
+			tableEnv.executeSql("create table db1.part(x int) partitioned by (p1 date,p2 timestamp)");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part")
 					.addRow(new Object[]{1}).commit("p1='2018-08-08',p2='2018-08-08 08:08:08'");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part")
@@ -314,12 +311,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part")
 					.addRow(new Object[]{3}).commit("p1='2018-08-10',p2='2018-08-08 08:08:10'");
 
-			TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-			TestPartitionFilterCatalog catalog = new TestPartitionFilterCatalog(
-					hiveCatalog.getName(), hiveCatalog.getDefaultDatabase(), hiveCatalog.getHiveConf(), hiveCatalog.getHiveVersion());
-			tableEnv.registerCatalog(catalog.getName(), catalog);
-			tableEnv.useCatalog(catalog.getName());
-
 			Table query = tableEnv.sqlQuery(
 					"select x from db1.part where p1>cast('2018-08-09' as date) and p2<>cast('2018-08-08 08:08:09' as timestamp)");
 			String[] explain = query.explain().split("==.*==\n");
@@ -330,14 +321,14 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 			assertEquals("[3]", results.toString());
 			System.out.println(results);
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
 	@Test
 	public void testProjectionPushDown() throws Exception {
-		hiveShell.execute("create table src(x int,y string) partitioned by (p1 bigint, p2 string)");
-		final String catalogName = "hive";
+		TableEnvironment tableEnv = createTableEnv();
+		tableEnv.executeSql("create table src(x int,y string) partitioned by (p1 bigint, p2 string)");
 		try {
 			HiveTestUtils.createTextTableInserter(hiveShell, "default", "src")
 					.addRow(new Object[]{1, "a"})
@@ -346,8 +337,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 			HiveTestUtils.createTextTableInserter(hiveShell, "default", "src")
 					.addRow(new Object[]{3, "c"})
 					.commit("p1=2014, p2='2014'");
-			TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-			tableEnv.registerCatalog(catalogName, hiveCatalog);
 			Table table = tableEnv.sqlQuery("select p1, count(y) from hive.`default`.src group by p1");
 			String[] explain = table.explain().split("==.*==\n");
 			assertEquals(4, explain.length);
@@ -363,14 +352,14 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 			Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
 			assertArrayEquals(new String[]{"2013,2", "2014,1"}, rowStrings);
 		} finally {
-			hiveShell.execute("drop table src");
+			tableEnv.executeSql("drop table src");
 		}
 	}
 
 	@Test
 	public void testLimitPushDown() throws Exception {
-		hiveShell.execute("create table src (a string)");
-		final String catalogName = "hive";
+		TableEnvironment tableEnv = createTableEnv();
+		tableEnv.executeSql("create table src (a string)");
 		try {
 			HiveTestUtils.createTextTableInserter(hiveShell, "default", "src")
 						.addRow(new Object[]{"a"})
@@ -380,8 +369,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 						.commit();
 			//Add this to obtain correct stats of table to avoid FLINK-14965 problem
 			hiveShell.execute("analyze table src COMPUTE STATISTICS");
-			TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-			tableEnv.registerCatalog(catalogName, hiveCatalog);
 			Table table = tableEnv.sqlQuery("select * from hive.`default`.src limit 1");
 			String[] explain = table.explain().split("==.*==\n");
 			assertEquals(4, explain.length);
@@ -397,17 +384,17 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 			Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
 			assertArrayEquals(new String[]{"a"}, rowStrings);
 		} finally {
-			hiveShell.execute("drop table src");
+			tableEnv.executeSql("drop table src");
 		}
 	}
 
 	@Test
 	public void testParallelismSetting() {
-		final String catalogName = "hive";
 		final String dbName = "source_db";
 		final String tblName = "test_parallelism";
-		hiveShell.execute("CREATE TABLE source_db.test_parallelism " +
-				"(year STRING, value INT) partitioned by (pt int);");
+		TableEnvironment tEnv = createTableEnv();
+		tEnv.executeSql("CREATE TABLE source_db.test_parallelism " +
+				"(`year` STRING, `value` INT) partitioned by (pt int)");
 		HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
 				.addRow(new Object[]{"2014", 3})
 				.addRow(new Object[]{"2014", 4})
@@ -416,8 +403,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 				.addRow(new Object[]{"2015", 2})
 				.addRow(new Object[]{"2015", 5})
 				.commit("pt=1");
-		TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-		tEnv.registerCatalog(catalogName, hiveCatalog);
 		Table table = tEnv.sqlQuery("select * from hive.source_db.test_parallelism");
 		PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) tEnv).getPlanner();
 		RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
@@ -429,11 +414,15 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 
 	@Test
 	public void testParallelismOnLimitPushDown() {
-		final String catalogName = "hive";
 		final String dbName = "source_db";
 		final String tblName = "test_parallelism_limit_pushdown";
-		hiveShell.execute("CREATE TABLE source_db.test_parallelism_limit_pushdown " +
-					"(year STRING, value INT) partitioned by (pt int);");
+		TableEnvironment tEnv = createTableEnv();
+		tEnv.getConfig().getConfiguration().setBoolean(
+				HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
+		tEnv.getConfig().getConfiguration().setInteger(
+				ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+		tEnv.executeSql("CREATE TABLE source_db.test_parallelism_limit_pushdown " +
+					"(`year` STRING, `value` INT) partitioned by (pt int)");
 		HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
 					.addRow(new Object[]{"2014", 3})
 					.addRow(new Object[]{"2014", 4})
@@ -442,12 +431,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 					.addRow(new Object[]{"2015", 2})
 					.addRow(new Object[]{"2015", 5})
 					.commit("pt=1");
-		TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-		tEnv.getConfig().getConfiguration().setBoolean(
-			HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
-		tEnv.getConfig().getConfiguration().setInteger(
-			ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
-		tEnv.registerCatalog(catalogName, hiveCatalog);
 		Table table = tEnv.sqlQuery("select * from hive.source_db.test_parallelism_limit_pushdown limit 1");
 		PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) tEnv).getPlanner();
 		RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
@@ -480,7 +463,11 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 		final String catalogName = "hive";
 		final String dbName = "source_db";
 		final String tblName = "stream_test";
-		hiveShell.execute("CREATE TABLE source_db.stream_test (" +
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env, SqlDialect.HIVE);
+		tEnv.registerCatalog(catalogName, hiveCatalog);
+		tEnv.useCatalog(catalogName);
+		tEnv.executeSql("CREATE TABLE source_db.stream_test (" +
 				" a INT," +
 				" b STRING" +
 				") PARTITIONED BY (ts STRING) TBLPROPERTIES (" +
@@ -493,9 +480,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 				.addRow(new Object[]{0, "0"})
 				.commit("ts='2020-05-06 00:00:00'");
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
-		tEnv.registerCatalog(catalogName, hiveCatalog);
 		Table src = tEnv.from("hive.source_db.stream_test");
 
 		TestingAppendRowDataSink sink = new TestingAppendRowDataSink(new RowDataTypeInfo(
@@ -553,8 +537,13 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 
 	private void testNonPartitionStreamingSource(Boolean useMapredReader, String tblName) throws Exception {
 		final String catalogName = "hive";
-		final String dbName = "source_db";
-		hiveShell.execute("CREATE TABLE source_db." + tblName + " (" +
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env, SqlDialect.HIVE);
+		tEnv.getConfig().getConfiguration().setBoolean(
+				HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, useMapredReader);
+		tEnv.registerCatalog(catalogName, hiveCatalog);
+		tEnv.useCatalog(catalogName);
+		tEnv.executeSql("CREATE TABLE source_db." + tblName + " (" +
 				"  a INT," +
 				"  b CHAR(1) " +
 				") stored as parquet TBLPROPERTIES (" +
@@ -562,12 +551,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 				"  'streaming-source.monitor-interval'='100ms'" +
 				")");
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
-		tEnv.getConfig().getConfiguration().setBoolean(
-				HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, useMapredReader);
-
-		tEnv.registerCatalog(catalogName, hiveCatalog);
 		Table src = tEnv.sqlQuery("select * from hive.source_db." + tblName);
 
 		TestingAppendSink sink = new TestingAppendSink();
@@ -672,6 +655,13 @@ public class HiveTableSourceTest extends BatchAbstractTestBase {
 				tEnv.executeSql("select * from parquet_t").collect().next());
 	}
 
+	private static TableEnvironment createTableEnv() {
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+		tableEnv.registerCatalog("hive", hiveCatalog);
+		tableEnv.useCatalog("hive");
+		return tableEnv;
+	}
+
 	/**
 	 * A sub-class of HiveTableSource to test vector reader switch.
 	 */
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index fa31c8b..2dcd21b 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.connectors.hive;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.HiveVersionTestUtil;
+import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
@@ -90,12 +91,11 @@ public class TableEnvHiveConnectorTest {
 
 	@Test
 	public void testDefaultPartitionName() throws Exception {
-		hiveShell.execute("create database db1");
-		hiveShell.execute("create table db1.src (x int, y int)");
-		hiveShell.execute("create table db1.part (x int) partitioned by (y int)");
-		HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src").addRow(new Object[]{1, 1}).addRow(new Object[]{2, null}).commit();
-
 		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
+		tableEnv.executeSql("create table db1.src (x int, y int)");
+		tableEnv.executeSql("create table db1.part (x int) partitioned by (y int)");
+		HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src").addRow(new Object[]{1, 1}).addRow(new Object[]{2, null}).commit();
 
 		// test generating partitions with default name
 		TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.part select * from db1.src");
@@ -110,21 +110,20 @@ public class TableEnvHiveConnectorTest {
 		List<Row> rows = Lists.newArrayList(flinkTable.execute().collect());
 		assertEquals(Arrays.toString(new String[]{"1,1", "null,2"}), rows.toString());
 
-		hiveShell.execute("drop database db1 cascade");
+		tableEnv.executeSql("drop database db1 cascade");
 	}
 
 	@Test
 	public void testGetNonExistingFunction() throws Exception {
-		hiveShell.execute("create database db1");
-		hiveShell.execute("create table db1.src (d double, s string)");
-		hiveShell.execute("create table db1.dest (x bigint)");
-
 		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
+		tableEnv.executeSql("create table db1.src (d double, s string)");
+		tableEnv.executeSql("create table db1.dest (x bigint)");
 
 		// just make sure the query runs through, no need to verify result
 		TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest select count(d) from db1.src");
 
-		hiveShell.execute("drop database db1 cascade");
+		tableEnv.executeSql("drop database db1 cascade");
 	}
 
 	@Test
@@ -142,7 +141,7 @@ public class TableEnvHiveConnectorTest {
 	private void readWriteFormat(String format) throws Exception {
 		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
 
-		hiveShell.execute("create database db1");
+		tableEnv.executeSql("create database db1");
 
 		// create source and dest tables
 		String suffix;
@@ -164,9 +163,9 @@ public class TableEnvHiveConnectorTest {
 			tableSchema = "(i int,s string,ts timestamp)";
 		}
 
-		hiveShell.execute(String.format(
+		tableEnv.executeSql(String.format(
 				"create table db1.src %s partitioned by (p1 string, p2 timestamp) %s", tableSchema, suffix));
-		hiveShell.execute(String.format(
+		tableEnv.executeSql(String.format(
 				"create table db1.dest %s partitioned by (p1 string, p2 timestamp) %s", tableSchema, suffix));
 
 		// prepare source data with Hive
@@ -194,7 +193,7 @@ public class TableEnvHiveConnectorTest {
 		// verify data on hive side
 		verifyHiveQueryResult("select * from db1.dest", expected);
 
-		hiveShell.execute("drop database db1 cascade");
+		tableEnv.executeSql("drop database db1 cascade");
 	}
 
 	private String toRowValue(List<Object> row) {
@@ -209,16 +208,16 @@ public class TableEnvHiveConnectorTest {
 
 	@Test
 	public void testDecimal() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.src1 (x decimal(10,2))");
-			hiveShell.execute("create table db1.src2 (x decimal(10,2))");
-			hiveShell.execute("create table db1.dest (x decimal(10,2))");
+			tableEnv.executeSql("create table db1.src1 (x decimal(10,2))");
+			tableEnv.executeSql("create table db1.src2 (x decimal(10,2))");
+			tableEnv.executeSql("create table db1.dest (x decimal(10,2))");
 			// populate src1 from Hive
 			// TABLE keyword in INSERT INTO is mandatory prior to 1.1.0
 			hiveShell.execute("insert into table db1.src1 values (1.0),(2.12),(5.123),(5.456),(123456789.12)");
 
-			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
 			// populate src2 with same data from Flink
 			TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.src2 values (cast(1.0 as decimal(10,2))), (cast(2.12 as decimal(10,2))), " +
 					"(cast(5.123 as decimal(10,2))), (cast(5.456 as decimal(10,2))), (cast(123456789.12 as decimal(10,2)))");
@@ -229,24 +228,25 @@ public class TableEnvHiveConnectorTest {
 			TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest select * from db1.src1");
 			verifyHiveQueryResult("select * from db1.dest", hiveShell.executeQuery("select * from db1.src1"));
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
 	@Test
 	public void testInsertOverwrite() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
 			// non-partitioned
-			hiveShell.execute("create table db1.dest (x int, y string)");
+			tableEnv.executeSql("create table db1.dest (x int, y string)");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "dest").addRow(new Object[]{1, "a"}).addRow(new Object[]{2, "b"}).commit();
 			verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta", "2\tb"));
-			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+
 			TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert overwrite db1.dest values (3, 'c')");
 			verifyHiveQueryResult("select * from db1.dest", Collections.singletonList("3\tc"));
 
 			// static partition
-			hiveShell.execute("create table db1.part(x int) partitioned by (y int)");
+			tableEnv.executeSql("create table db1.part(x int) partitioned by (y int)");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part").addRow(new Object[]{1}).commit("y=1");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part").addRow(new Object[]{2}).commit("y=2");
 			tableEnv = getTableEnvWithHiveCatalog();
@@ -259,67 +259,68 @@ public class TableEnvHiveConnectorTest {
 			// only overwrite dynamically matched partitions, other existing partitions remain intact
 			verifyHiveQueryResult("select * from db1.part", Arrays.asList("100\t1", "200\t2", "3\t3"));
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
 	@Test
 	public void testStaticPartition() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.src (x int)");
+			tableEnv.executeSql("create table db1.src (x int)");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src").addRow(new Object[]{1}).addRow(new Object[]{2}).commit();
-			hiveShell.execute("create table db1.dest (x int) partitioned by (p1 string, p2 double)");
-			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+			tableEnv.executeSql("create table db1.dest (x int) partitioned by (p1 string, p2 double)");
 			TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest partition (p1='1''1', p2=1.1) select x from db1.src");
 			assertEquals(1, hiveCatalog.listPartitions(new ObjectPath("db1", "dest")).size());
 			verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\t1'1\t1.1", "2\t1'1\t1.1"));
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
 	@Test
 	public void testDynamicPartition() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.src (x int, y string, z double)");
+			tableEnv.executeSql("create table db1.src (x int, y string, z double)");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src")
 					.addRow(new Object[]{1, "a", 1.1})
 					.addRow(new Object[]{2, "a", 2.2})
 					.addRow(new Object[]{3, "b", 3.3})
 					.commit();
-			hiveShell.execute("create table db1.dest (x int) partitioned by (p1 string, p2 double)");
-			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+			tableEnv.executeSql("create table db1.dest (x int) partitioned by (p1 string, p2 double)");
 			TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest select * from db1.src");
 			assertEquals(3, hiveCatalog.listPartitions(new ObjectPath("db1", "dest")).size());
 			verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta\t1.1", "2\ta\t2.2", "3\tb\t3.3"));
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
 	@Test
 	public void testPartialDynamicPartition() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.src (x int, y string)");
+			tableEnv.executeSql("create table db1.src (x int, y string)");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src").addRow(new Object[]{1, "a"}).addRow(new Object[]{2, "b"}).commit();
-			hiveShell.execute("create table db1.dest (x int) partitioned by (p1 double, p2 string)");
-			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+			tableEnv.executeSql("create table db1.dest (x int) partitioned by (p1 double, p2 string)");
 			TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest partition (p1=1.1) select x,y from db1.src");
 			assertEquals(2, hiveCatalog.listPartitions(new ObjectPath("db1", "dest")).size());
 			verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\t1.1\ta", "2\t1.1\tb"));
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
 	@Test
 	public void testDateTimestampPartitionColumns() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.part(x int) partitioned by (dt date,ts timestamp)");
+			tableEnv.executeSql("create table db1.part(x int) partitioned by (dt date,ts timestamp)");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part")
 					.addRow(new Object[]{1})
 					.addRow(new Object[]{2})
@@ -327,7 +328,6 @@ public class TableEnvHiveConnectorTest {
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part")
 					.addRow(new Object[]{3})
 					.commit("dt='2019-12-25',ts='2019-12-25 16:23:43.012'");
-			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
 			List<Row> results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.part order by x").execute().collect());
 			assertEquals("[1,2019-12-23,2019-12-23T00:00, 2,2019-12-23,2019-12-23T00:00, 3,2019-12-25,2019-12-25T16:23:43.012]", results.toString());
 
@@ -338,7 +338,7 @@ public class TableEnvHiveConnectorTest {
 			results = Lists.newArrayList(tableEnv.sqlQuery("select max(dt) from db1.part").execute().collect());
 			assertEquals("[2019-12-31]", results.toString());
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
@@ -351,11 +351,12 @@ public class TableEnvHiveConnectorTest {
 		// Therefore disable such tests for older Hive versions.
 		String hiveVersion = HiveShimLoader.getHiveVersion();
 		Assume.assumeTrue(hiveVersion.compareTo("2.0.0") >= 0 || hiveVersion.compareTo("1.3.0") >= 0);
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.simple (i int,a array<int>)");
-			hiveShell.execute("create table db1.nested (a array<map<int, string>>)");
-			hiveShell.execute("create function hiveudtf as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'");
+			tableEnv.executeSql("create table db1.simple (i int,a array<int>)");
+			tableEnv.executeSql("create table db1.nested (a array<map<int, string>>)");
+			tableEnv.executeSql("create function hiveudtf as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'");
 			hiveShell.insertInto("db1", "simple").addRow(3, Arrays.asList(1, 2, 3)).commit();
 			Map<Integer, String> map1 = new HashMap<>();
 			map1.put(1, "a");
@@ -364,7 +365,6 @@ public class TableEnvHiveConnectorTest {
 			map2.put(3, "c");
 			hiveShell.insertInto("db1", "nested").addRow(Arrays.asList(map1, map2)).commit();
 
-			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
 			List<Row> results = Lists.newArrayList(
 					tableEnv.sqlQuery("select x from db1.simple, lateral table(hiveudtf(a)) as T(x)").execute().collect());
 			assertEquals("[1, 2, 3]", results.toString());
@@ -372,7 +372,7 @@ public class TableEnvHiveConnectorTest {
 					tableEnv.sqlQuery("select x from db1.nested, lateral table(hiveudtf(a)) as T(x)").execute().collect());
 			assertEquals("[{1=a, 2=b}, {3=c}]", results.toString());
 
-			hiveShell.execute("create table db1.ts (a array<timestamp>)");
+			tableEnv.executeSql("create table db1.ts (a array<timestamp>)");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "ts").addRow(new Object[]{
 					new Object[]{Timestamp.valueOf("2015-04-28 15:23:00"), Timestamp.valueOf("2016-06-03 17:05:52")}})
 					.commit();
@@ -380,17 +380,18 @@ public class TableEnvHiveConnectorTest {
 					tableEnv.sqlQuery("select x from db1.ts, lateral table(hiveudtf(a)) as T(x)").execute().collect());
 			assertEquals("[2015-04-28T15:23, 2016-06-03T17:05:52]", results.toString());
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
-			hiveShell.execute("drop function hiveudtf");
+			tableEnv.executeSql("drop database db1 cascade");
+			tableEnv.executeSql("drop function hiveudtf");
 		}
 	}
 
 	@Test
 	public void testNotNullConstraints() throws Exception {
 		Assume.assumeTrue(HiveVersionTestUtil.HIVE_310_OR_LATER);
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.tbl (x int,y bigint not null enable rely,z string not null enable norely)");
+			tableEnv.executeSql("create table db1.tbl (x int,y bigint not null enable rely,z string not null enable norely)");
 			CatalogBaseTable catalogTable = hiveCatalog.getTable(new ObjectPath("db1", "tbl"));
 			TableSchema tableSchema = catalogTable.getSchema();
 			assertTrue("By default columns should be nullable",
@@ -400,7 +401,7 @@ public class TableEnvHiveConnectorTest {
 			assertTrue("NOT NULL NORELY columns should be considered nullable",
 					tableSchema.getFieldDataTypes()[2].getLogicalType().isNullable());
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
@@ -409,10 +410,11 @@ public class TableEnvHiveConnectorTest {
 		// While PK constraints are supported since Hive 2.1.0, the constraints cannot be RELY in 2.x versions.
 		// So let's only test for 3.x.
 		Assume.assumeTrue(HiveVersionTestUtil.HIVE_310_OR_LATER);
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
 			// test rely PK constraints
-			hiveShell.execute("create table db1.tbl1 (x tinyint,y smallint,z int, primary key (x,z) disable novalidate rely)");
+			tableEnv.executeSql("create table db1.tbl1 (x tinyint,y smallint,z int, primary key (x,z) disable novalidate rely)");
 			CatalogBaseTable catalogTable = hiveCatalog.getTable(new ObjectPath("db1", "tbl1"));
 			TableSchema tableSchema = catalogTable.getSchema();
 			assertTrue(tableSchema.getPrimaryKey().isPresent());
@@ -421,32 +423,32 @@ public class TableEnvHiveConnectorTest {
 			assertTrue(pk.getColumns().containsAll(Arrays.asList("x", "z")));
 
 			// test norely PK constraints
-			hiveShell.execute("create table db1.tbl2 (x tinyint,y smallint, primary key (x) disable norely)");
+			tableEnv.executeSql("create table db1.tbl2 (x tinyint,y smallint, primary key (x) disable norely)");
 			catalogTable = hiveCatalog.getTable(new ObjectPath("db1", "tbl2"));
 			tableSchema = catalogTable.getSchema();
 			assertFalse(tableSchema.getPrimaryKey().isPresent());
 
 			// test table w/o PK
-			hiveShell.execute("create table db1.tbl3 (x tinyint)");
+			tableEnv.executeSql("create table db1.tbl3 (x tinyint)");
 			catalogTable = hiveCatalog.getTable(new ObjectPath("db1", "tbl3"));
 			tableSchema = catalogTable.getSchema();
 			assertFalse(tableSchema.getPrimaryKey().isPresent());
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
 	@Test
 	public void testTimestamp() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.src (ts timestamp)");
-			hiveShell.execute("create table db1.dest (ts timestamp)");
+			tableEnv.executeSql("create table db1.src (ts timestamp)");
+			tableEnv.executeSql("create table db1.dest (ts timestamp)");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src")
 					.addRow(new Object[]{Timestamp.valueOf("2019-11-11 00:00:00")})
 					.addRow(new Object[]{Timestamp.valueOf("2019-12-03 15:43:32.123456789")})
 					.commit();
-			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
 			// test read timestamp from hive
 			List<Row> results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.src").execute().collect());
 			assertEquals(2, results.size());
@@ -456,21 +458,21 @@ public class TableEnvHiveConnectorTest {
 			TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest select max(ts) from db1.src");
 			verifyHiveQueryResult("select * from db1.dest", Collections.singletonList("2019-12-03 15:43:32.123456789"));
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
 	@Test
 	public void testDate() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.src (dt date)");
-			hiveShell.execute("create table db1.dest (dt date)");
+			tableEnv.executeSql("create table db1.src (dt date)");
+			tableEnv.executeSql("create table db1.dest (dt date)");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src")
 					.addRow(new Object[]{Date.valueOf("2019-12-09")})
 					.addRow(new Object[]{Date.valueOf("2019-12-12")})
 					.commit();
-			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
 			// test read date from hive
 			List<Row> results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.src").execute().collect());
 			assertEquals(2, results.size());
@@ -480,15 +482,16 @@ public class TableEnvHiveConnectorTest {
 			TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest select max(dt) from db1.src");
 			verifyHiveQueryResult("select * from db1.dest", Collections.singletonList("2019-12-12"));
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
 	@Test
 	public void testViews() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.src (key int,val string)");
+			tableEnv.executeSql("create table db1.src (key int,val string)");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src")
 					.addRow(new Object[]{1, "a"})
 					.addRow(new Object[]{1, "aa"})
@@ -497,7 +500,7 @@ public class TableEnvHiveConnectorTest {
 					.addRow(new Object[]{3, "c"})
 					.addRow(new Object[]{3, "ccc"})
 					.commit();
-			hiveShell.execute("create table db1.keys (key int,name string)");
+			tableEnv.executeSql("create table db1.keys (key int,name string)");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "keys")
 					.addRow(new Object[]{1, "key1"})
 					.addRow(new Object[]{2, "key2"})
@@ -507,7 +510,6 @@ public class TableEnvHiveConnectorTest {
 			hiveShell.execute("create view db1.v1 as select key as k,val as v from db1.src limit 2");
 			hiveShell.execute("create view db1.v2 as select key,count(*) from db1.src group by key having count(*)>1 order by key");
 			hiveShell.execute("create view db1.v3 as select k.key,k.name,count(*) from db1.src s join db1.keys k on s.key=k.key group by k.key,k.name order by k.key");
-			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
 			List<Row> results = Lists.newArrayList(tableEnv.sqlQuery("select count(v) from db1.v1").execute().collect());
 			assertEquals("[2]", results.toString());
 			results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.v2").execute().collect());
@@ -515,16 +517,16 @@ public class TableEnvHiveConnectorTest {
 			results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.v3").execute().collect());
 			assertEquals("[1,key1,3, 2,key2,1, 3,key3,2]", results.toString());
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
 	@Test
 	public void testWhitespacePartValue() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.dest (x int) partitioned by (p string)");
-			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+			tableEnv.executeSql("create table db1.dest (x int) partitioned by (p string)");
 			StatementSet stmtSet = tableEnv.createStatementSet();
 			stmtSet.addInsertSql("insert into db1.dest select 1,'  '");
 			stmtSet.addInsertSql("insert into db1.dest select 2,'a \t'");
@@ -533,29 +535,29 @@ public class TableEnvHiveConnectorTest {
 			tableResult.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
 			assertEquals("[p=  , p=a %09]", hiveShell.executeQuery("show partitions db1.dest").toString());
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
 	private void testCompressTextTable(boolean batch) throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = batch ?
+				getTableEnvWithHiveCatalog() :
+				getStreamTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.src (x string,y string)");
+			tableEnv.executeSql("create table db1.src (x string,y string)");
 			hiveShell.execute("create table db1.dest like db1.src");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src")
 					.addRow(new Object[]{"a", "b"})
 					.addRow(new Object[]{"c", "d"})
 					.commit();
 			hiveCatalog.getHiveConf().setBoolVar(HiveConf.ConfVars.COMPRESSRESULT, true);
-			TableEnvironment tableEnv = batch ?
-					getTableEnvWithHiveCatalog() :
-					getStreamTableEnvWithHiveCatalog();
 			TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest select * from db1.src");
 			List<String> expected = Arrays.asList("a\tb", "c\td");
 			verifyHiveQueryResult("select * from db1.dest", expected);
 			verifyFlinkQueryResult(tableEnv.sqlQuery("select * from db1.dest"), expected);
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
@@ -571,52 +573,52 @@ public class TableEnvHiveConnectorTest {
 
 	@Test
 	public void testRegexSerDe() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.src (x int,y string) " +
+			tableEnv.executeSql("create table db1.src (x int,y string) " +
 					"row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe' " +
-					"with serdeproperties ('input.regex'='([\\\\d]+)\\u0001([\\\\S]+)')");
+					"with serdeproperties ('input.regex'='([\\d]+)\\u0001([\\S]+)')");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src")
 					.addRow(new Object[]{1, "a"})
 					.addRow(new Object[]{2, "ab"})
 					.commit();
-			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
 			assertEquals("[1,a, 2,ab]", Lists.newArrayList(tableEnv.sqlQuery("select * from db1.src order by x").execute().collect()).toString());
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
 	@Test
 	public void testUpdatePartitionSD() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.dest (x int) partitioned by (p string) stored as rcfile");
-			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+			tableEnv.executeSql("create table db1.dest (x int) partitioned by (p string) stored as rcfile");
 			TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert overwrite db1.dest partition (p='1') select 1");
-			hiveShell.execute("alter table db1.dest set fileformat sequencefile");
+			tableEnv.executeSql("alter table db1.dest set fileformat sequencefile");
 			TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert overwrite db1.dest partition (p='1') select 1");
 			assertEquals("[1,1]", Lists.newArrayList(tableEnv.sqlQuery("select * from db1.dest").execute().collect()).toString());
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
 	@Test
 	public void testParquetNameMapping() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.t1 (x int,y int) stored as parquet");
-			hiveShell.execute("insert into table db1.t1 values (1,10),(2,20)");
+			tableEnv.executeSql("create table db1.t1 (x int,y int) stored as parquet");
+			TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into table db1.t1 values (1,10),(2,20)");
 			Table hiveTable = hiveCatalog.getHiveTable(new ObjectPath("db1", "t1"));
 			String location = hiveTable.getSd().getLocation();
-			hiveShell.execute(String.format("create table db1.t2 (y int,x int) stored as parquet location '%s'", location));
-			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+			tableEnv.executeSql(String.format("create table db1.t2 (y int,x int) stored as parquet location '%s'", location));
 			tableEnv.getConfig().getConfiguration().setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);
 			assertEquals("[1, 2]", Lists.newArrayList(tableEnv.sqlQuery("select x from db1.t1").execute().collect()).toString());
 			assertEquals("[1, 2]", Lists.newArrayList(tableEnv.sqlQuery("select x from db1.t2").execute().collect()).toString());
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
@@ -625,47 +627,47 @@ public class TableEnvHiveConnectorTest {
 		// not supported until 2.1.0 -- https://issues.apache.org/jira/browse/HIVE-11981,
 		// https://issues.apache.org/jira/browse/HIVE-13178
 		Assume.assumeTrue(HiveVersionTestUtil.HIVE_210_OR_LATER);
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.src (x smallint,y int) stored as orc");
+			tableEnv.executeSql("create table db1.src (x smallint,y int) stored as orc");
 			hiveShell.execute("insert into table db1.src values (1,100),(2,200)");
 
-			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
 			tableEnv.getConfig().getConfiguration().setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);
 
-			hiveShell.execute("alter table db1.src change x x int");
+			tableEnv.executeSql("alter table db1.src change x x int");
 			assertEquals("[1,100, 2,200]", Lists.newArrayList(tableEnv.sqlQuery("select * from db1.src").execute().collect()).toString());
 
-			hiveShell.execute("alter table db1.src change y y string");
+			tableEnv.executeSql("alter table db1.src change y y string");
 			assertEquals("[1,100, 2,200]", Lists.newArrayList(tableEnv.sqlQuery("select * from db1.src").execute().collect()).toString());
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
 	@Test
 	public void testNonExistingPartitionFolder() throws Exception {
-		hiveShell.execute("create database db1");
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+		tableEnv.executeSql("create database db1");
 		try {
-			hiveShell.execute("create table db1.part (x int) partitioned by (p int)");
+			tableEnv.executeSql("create table db1.part (x int) partitioned by (p int)");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part").addRow(new Object[]{1}).commit("p=1");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part").addRow(new Object[]{2}).commit("p=2");
-			hiveShell.execute("alter table db1.part add partition (p=3)");
+			tableEnv.executeSql("alter table db1.part add partition (p=3)");
 			// remove one partition
 			Path toRemove = new Path(hiveCatalog.getHiveTable(new ObjectPath("db1", "part")).getSd().getLocation(), "p=2");
 			FileSystem fs = toRemove.getFileSystem(hiveShell.getHiveConf());
 			fs.delete(toRemove, true);
 
-			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
 			List<Row> results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.part").execute().collect());
 			assertEquals("[1,1]", results.toString());
 		} finally {
-			hiveShell.execute("drop database db1 cascade");
+			tableEnv.executeSql("drop database db1 cascade");
 		}
 	}
 
 	private TableEnvironment getTableEnvWithHiveCatalog() {
-		TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
 		tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
 		tableEnv.useCatalog(hiveCatalog.getName());
 		return tableEnv;
@@ -673,7 +675,7 @@ public class TableEnvHiveConnectorTest {
 
 	private TableEnvironment getStreamTableEnvWithHiveCatalog() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env, SqlDialect.HIVE);
 		tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
 		tableEnv.useCatalog(hiveCatalog.getName());
 		return tableEnv;
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
index d3c6ed4..4ecf3f6 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
@@ -251,51 +252,47 @@ public class HiveCatalogUseBlinkITCase extends AbstractTestBase {
 
 	@Test
 	public void testTimestampUDF() throws Exception {
-		hiveCatalog.createFunction(new ObjectPath("default", "myyear"),
-				new CatalogFunctionImpl(UDFYear.class.getCanonicalName()),
-				false);
 
-		hiveShell.execute("create table src(ts timestamp)");
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+		tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+		tableEnv.useCatalog(hiveCatalog.getName());
+		tableEnv.executeSql(String.format("create function myyear as '%s'", UDFYear.class.getName()));
+		tableEnv.executeSql("create table src(ts timestamp)");
 		try {
 			HiveTestUtils.createTextTableInserter(hiveShell, "default", "src")
 					.addRow(new Object[]{Timestamp.valueOf("2013-07-15 10:00:00")})
 					.addRow(new Object[]{Timestamp.valueOf("2019-05-23 17:32:55")})
 					.commit();
-			TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-			tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
-			tableEnv.useCatalog(hiveCatalog.getName());
 
 			List<Row> results = Lists.newArrayList(
 					tableEnv.sqlQuery("select myyear(ts) as y from src").execute().collect());
 			Assert.assertEquals(2, results.size());
 			Assert.assertEquals("[2013, 2019]", results.toString());
 		} finally {
-			hiveShell.execute("drop table src");
+			tableEnv.executeSql("drop table src");
 		}
 	}
 
 	@Test
 	public void testDateUDF() throws Exception {
-		hiveCatalog.createFunction(new ObjectPath("default", "mymonth"),
-				new CatalogFunctionImpl(UDFMonth.class.getCanonicalName()),
-				false);
 
-		hiveShell.execute("create table src(dt date)");
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+		tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+		tableEnv.useCatalog(hiveCatalog.getName());
+		tableEnv.executeSql(String.format("create function mymonth as '%s'", UDFMonth.class.getName()));
+		tableEnv.executeSql("create table src(dt date)");
 		try {
 			HiveTestUtils.createTextTableInserter(hiveShell, "default", "src")
 					.addRow(new Object[]{Date.valueOf("2019-01-19")})
 					.addRow(new Object[]{Date.valueOf("2019-03-02")})
 					.commit();
-			TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-			tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
-			tableEnv.useCatalog(hiveCatalog.getName());
 
 			List<Row> results = Lists.newArrayList(
 					tableEnv.sqlQuery("select mymonth(dt) as m from src order by m").execute().collect());
 			Assert.assertEquals(2, results.size());
 			Assert.assertEquals("[1, 3]", results.toString());
 		} finally {
-			hiveShell.execute("drop table src");
+			tableEnv.executeSql("drop table src");
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 252c6e1..57cbd8f 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -113,17 +114,28 @@ public class HiveTestUtils {
 	}
 
 	public static TableEnvironment createTableEnvWithBlinkPlannerBatchMode() {
+		return createTableEnvWithBlinkPlannerBatchMode(SqlDialect.DEFAULT);
+	}
+
+	public static TableEnvironment createTableEnvWithBlinkPlannerBatchMode(SqlDialect dialect) {
 		EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
 		TableEnvironment tableEnv = TableEnvironment.create(settings);
 		tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
+		tableEnv.getConfig().setSqlDialect(dialect);
 		return tableEnv;
 	}
 
 	public static StreamTableEnvironment createTableEnvWithBlinkPlannerStreamMode(
 			StreamExecutionEnvironment env) {
+		return createTableEnvWithBlinkPlannerStreamMode(env, SqlDialect.DEFAULT);
+	}
+
+	public static StreamTableEnvironment createTableEnvWithBlinkPlannerStreamMode(
+			StreamExecutionEnvironment env, SqlDialect dialect) {
 		EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
 		StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
 		tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
+		tableEnv.getConfig().setSqlDialect(dialect);
 		return tableEnv;
 	}
 


[flink] 04/04: [hotfix] Remove generic row for HiveTableFactory

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2f923867769feb2a6461191ad8656da6cb84002a
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon May 25 21:42:25 2020 +0800

    [hotfix] Remove generic row for HiveTableFactory
    
    This closes #12324
---
 .../main/java/org/apache/flink/connectors/hive/HiveTableFactory.java | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
index 01b16c5..0bb78b8 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.factories.TableSinkFactory;
 import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -42,7 +41,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * A table factory implementation for Hive catalog.
  */
 public class HiveTableFactory
-		implements TableSourceFactory<RowData>, TableSinkFactory<Row> {
+		implements TableSourceFactory<RowData>, TableSinkFactory {
 
 	private final HiveConf hiveConf;
 
@@ -79,7 +78,7 @@ public class HiveTableFactory
 	}
 
 	@Override
-	public TableSink<Row> createTableSink(TableSinkFactory.Context context) {
+	public TableSink createTableSink(TableSinkFactory.Context context) {
 		CatalogTable table = checkNotNull(context.getTable());
 		Preconditions.checkArgument(table instanceof CatalogTableImpl);
 


[flink] 02/04: [FLINK-17867][hive][test] Add hdfs dependency to hive-3.1.1 test

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 52b7e87d0945e460588d24cbcdba2019281d2200
Author: Rui Li <li...@apache.org>
AuthorDate: Mon May 25 17:10:06 2020 +0800

    [FLINK-17867][hive][test] Add hdfs dependency to hive-3.1.1 test
    
    This closes #12318
---
 flink-connectors/flink-connector-hive/pom.xml | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 0627f25..fe261c6 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -1105,6 +1105,14 @@ under the License.
 					<version>4.1.44.Final</version>
 					<scope>provided</scope>
 				</dependency>
+
+				<!-- Required by orc tests -->
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-hdfs</artifactId>
+					<version>${hivemetastore.hadoop.version}</version>
+					<scope>test</scope>
+				</dependency>
 			</dependencies>
 
 		</profile>