You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/09 16:17:13 UTC

[flink] branch master updated: [FLINK-13090][hive] Test Hive connector with hive runner

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d1e64e  [FLINK-13090][hive] Test Hive connector with hive runner
5d1e64e is described below

commit 5d1e64eda1c4dc9b3563641d393bf1953e4ac06b
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Jul 4 16:34:55 2019 +0800

    [FLINK-13090][hive] Test Hive connector with hive runner
    
    This PR uses hive runner in our hive connector tests.
    
    This closes #8987.
---
 flink-connectors/flink-connector-hive/pom.xml      |  99 ++++-
 .../table/catalog/hive/client/HiveShimLoader.java  |   4 +-
 .../connectors/hive/FlinkStandaloneHiveRunner.java | 464 +++++++++++++++++++++
 .../hive/FlinkStandaloneHiveServerContext.java     | 222 ++++++++++
 .../batch/connectors/hive/HiveRunnerShim.java      |  33 ++
 .../connectors/hive/HiveRunnerShimLoader.java      |  49 +++
 .../batch/connectors/hive/HiveRunnerShimV3.java    |  39 ++
 .../batch/connectors/hive/HiveRunnerShimV4.java    |  39 ++
 .../batch/connectors/hive/HiveTableSinkTest.java   |  69 ++-
 .../flink/table/catalog/hive/HiveTestUtils.java    |  25 ++
 10 files changed, 992 insertions(+), 51 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 3b549b6..5ceab9d 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -35,6 +35,11 @@ under the License.
 
 	<packaging>jar</packaging>
 
+	<properties>
+		<hiverunner.version>4.0.0</hiverunner.version>
+		<reflections.version>0.9.8</reflections.version>
+	</properties>
+
 	<dependencies>
 
 		<!-- core dependencies -->
@@ -260,10 +265,6 @@ under the License.
 				</exclusion>
 				<exclusion>
 					<groupId>org.apache.logging.log4j</groupId>
-					<artifactId>log4j-1.2-api</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.logging.log4j</groupId>
 					<artifactId>log4j-slf4j-impl</artifactId>
 				</exclusion>
 				<exclusion>
@@ -373,6 +374,95 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+            <groupId>com.klarna</groupId>
+            <artifactId>hiverunner</artifactId>
+            <version>${hiverunner.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-serde</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-jdbc</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive.hcatalog</groupId>
+                    <artifactId>hive-webhcat-java-client</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-service</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-contrib</artifactId>
+                </exclusion>
+				<exclusion>
+					<groupId>org.apache.tez</groupId>
+					<artifactId>tez-dag</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.tez</groupId>
+					<artifactId>tez-common</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.tez</groupId>
+					<artifactId>tez-mapreduce</artifactId>
+				</exclusion>
+            </exclusions>
+        </dependency>
+
+		<!--  We have 0.9.10 in dependency management but hiverunner requires 0.9.8, so need to explicitly specify it here -->
+		<dependency>
+			<groupId>org.reflections</groupId>
+			<artifactId>reflections</artifactId>
+			<version>${reflections.version}</version>
+			<scope>test</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>com.google.guava</groupId>
+					<artifactId>guava</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-service</artifactId>
+            <version>${hive.version}</version>
+            <scope>test</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>org.apache.hive</groupId>
+					<artifactId>hive-exec</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.hive</groupId>
+					<artifactId>hive-metastore</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.google.guava</groupId>
+					<artifactId>guava</artifactId>
+				</exclusion>
+			</exclusions>
+        </dependency>
+
+		<dependency>
+            <groupId>org.apache.hive.hcatalog</groupId>
+            <artifactId>hive-hcatalog-core</artifactId>
+            <version>${hive.version}</version>
+            <scope>test</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>com.google.guava</groupId>
+					<artifactId>guava</artifactId>
+				</exclusion>
+			</exclusions>
+        </dependency>
+
 	</dependencies>
 
 	<build>
@@ -440,6 +530,7 @@ under the License.
 			<properties>
 				<hive.version>1.2.1</hive.version>
 				<hivemetastore.hadoop.version>2.6.5</hivemetastore.hadoop.version>
+				<hiverunner.version>3.2.1</hiverunner.version>
 			</properties>
 		</profile>
 	</profiles>
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
index 3a7bd5f..771cfc0 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
@@ -32,8 +32,8 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class HiveShimLoader {
 
-	private static final String HIVE_V1_VERSION_NAME = "1.2.1";
-	private static final String HIVE_V2_VERSION_NAME = "2.3.4";
+	public static final String HIVE_V1_VERSION_NAME = "1.2.1";
+	public static final String HIVE_V2_VERSION_NAME = "2.3.4";
 
 	private static final Map<String, HiveShim> hiveShims = new ConcurrentHashMap<>(2);
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/FlinkStandaloneHiveRunner.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/FlinkStandaloneHiveRunner.java
new file mode 100644
index 0000000..1df9904
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/FlinkStandaloneHiveRunner.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
+import org.apache.flink.shaded.guava18.com.google.common.base.Splitter;
+import org.apache.flink.shaded.guava18.com.google.common.base.Throwables;
+import org.apache.flink.shaded.guava18.com.google.common.io.Resources;
+
+import com.klarna.hiverunner.HiveServerContainer;
+import com.klarna.hiverunner.HiveServerContext;
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.HiveShellContainer;
+import com.klarna.hiverunner.annotations.HiveProperties;
+import com.klarna.hiverunner.annotations.HiveResource;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import com.klarna.hiverunner.annotations.HiveSetupScript;
+import com.klarna.hiverunner.builder.HiveShellBuilder;
+import com.klarna.hiverunner.config.HiveRunnerConfig;
+import com.klarna.reflection.ReflectionUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.junit.Ignore;
+import org.junit.internal.AssumptionViolatedException;
+import org.junit.internal.runners.model.EachTestNotifier;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runner.notification.RunNotifier;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.model.FrameworkMethod;
+import org.junit.runners.model.InitializationError;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.net.www.ParseUtil;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.reflect.Field;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEHISTORYFILELOC;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.LOCALSCRATCHDIR;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORECONNECTURLKEY;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.SCRATCHDIR;
+import static org.reflections.ReflectionUtils.withAnnotation;
+
+/**
+ * JUnit 4 runner that runs hive sql on a HiveServer residing in this JVM. No external dependencies needed.
+ * Inspired by StandaloneHiveRunner.java (almost copied), just using local meta store server instead of embedded
+ * hive meta store.
+ */
+public class FlinkStandaloneHiveRunner extends BlockJUnit4ClassRunner {
+	private static final Logger LOGGER = LoggerFactory.getLogger(FlinkStandaloneHiveRunner.class);
+	private static final Duration HMS_START_TIMEOUT = Duration.ofSeconds(30);
+	private Future<Void> hmsWatcher;
+	private int hmsPort;
+	private HiveShellContainer container;
+	private HiveRunnerConfig config = new HiveRunnerConfig();
+
+	public FlinkStandaloneHiveRunner(Class<?> clazz) throws InitializationError {
+		super(clazz);
+	}
+
+	@Override
+	protected List<TestRule> classRules() {
+		final TemporaryFolder temporaryFolder = new TemporaryFolder();
+		try {
+			hmsPort = HiveTestUtils.getFreePort();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+		HiveServerContext context = new FlinkStandaloneHiveServerContext(temporaryFolder, config, hmsPort);
+		List<TestRule> rules = super.classRules();
+		ExternalResource hms = new ExternalResource() {
+			@Override
+			protected void before() throws Throwable {
+				LOGGER.info("Setting up {} in {}", getName(), temporaryFolder.getRoot().getAbsolutePath());
+				hmsWatcher = startHMS(context, hmsPort);
+			}
+
+			@Override
+			protected void after() {
+				if (hmsWatcher != null) {
+					hmsWatcher.cancel(true);
+				}
+			}
+		};
+		ExternalResource hiveShell = new ExternalResource() {
+			@Override
+			protected void before() throws Throwable {
+				container = createHiveServerContainer(getTestClass().getJavaClass(), context);
+			}
+
+			@Override
+			protected void after() {
+				tearDown();
+			}
+		};
+		rules.add(hiveShell);
+		rules.add(hms);
+		rules.add(temporaryFolder);
+		return rules;
+	}
+
+	@Override
+	protected void runChild(final FrameworkMethod method, RunNotifier notifier) {
+		Description description = describeChild(method);
+		if (method.getAnnotation(Ignore.class) != null) {
+			notifier.fireTestIgnored(description);
+		} else {
+			EachTestNotifier eachNotifier = new EachTestNotifier(notifier, description);
+			eachNotifier.fireTestStarted();
+			try {
+				runTestMethod(method, eachNotifier);
+			} finally {
+				eachNotifier.fireTestFinished();
+			}
+		}
+	}
+
+	/**
+	 * Runs a {@link Statement} that represents a leaf (aka atomic) test.
+	 */
+	private void runTestMethod(FrameworkMethod method,
+			EachTestNotifier notifier) {
+		Statement statement = methodBlock(method);
+
+		try {
+			statement.evaluate();
+		} catch (AssumptionViolatedException e) {
+			notifier.addFailedAssumption(e);
+		} catch (Throwable e) {
+			notifier.addFailure(e);
+		}
+	}
+
+	private void tearDown() {
+		if (container != null) {
+			LOGGER.info("Tearing down {}", getName());
+			try {
+				container.tearDown();
+			} catch (Throwable e) {
+				LOGGER.warn("Tear down failed: " + e.getMessage(), e);
+			}
+		}
+	}
+
+	/**
+	 * Traverses the test class annotations. Will inject a HiveShell in the test case that envelopes the HiveServer.
+	 */
+	private HiveShellContainer createHiveServerContainer(final Class testClass, HiveServerContext context)
+			throws Exception {
+
+		final HiveServerContainer hiveServerContainer = new HiveServerContainer(context);
+
+		HiveShellBuilder hiveShellBuilder = new HiveShellBuilder();
+		HiveRunnerShim hiveRunnerShim = HiveRunnerShimLoader.load();
+		hiveRunnerShim.setCommandShellEmulation(hiveShellBuilder, config);
+
+		HiveShellField shellSetter = loadScriptsUnderTest(testClass, hiveShellBuilder);
+
+		hiveShellBuilder.setHiveServerContainer(hiveServerContainer);
+
+		loadAnnotatedResources(testClass, hiveShellBuilder);
+
+		loadAnnotatedProperties(testClass, hiveShellBuilder);
+
+		loadAnnotatedSetupScripts(testClass, hiveShellBuilder);
+
+		// Build shell
+		final HiveShellContainer shell = hiveShellBuilder.buildShell();
+
+		// Set shell
+		shellSetter.setShell(shell);
+
+		if (shellSetter.isAutoStart()) {
+			shell.start();
+		}
+
+		return shell;
+	}
+
+	private HiveShellField loadScriptsUnderTest(final Class testClass, HiveShellBuilder hiveShellBuilder) {
+		try {
+			Set<Field> fields = ReflectionUtils.getAllFields(testClass, withAnnotation(HiveSQL.class));
+
+			Preconditions.checkState(fields.size() == 1, "Exactly one field should to be annotated with @HiveSQL");
+
+			final Field field = fields.iterator().next();
+			List<Path> scripts = new ArrayList<>();
+			HiveSQL annotation = field.getAnnotation(HiveSQL.class);
+			for (String scriptFilePath : annotation.files()) {
+				Path file = Paths.get(Resources.getResource(scriptFilePath).toURI());
+				Preconditions.checkState(Files.exists(file), "File " + file + " does not exist");
+				scripts.add(file);
+			}
+
+			Charset charset = annotation.encoding().equals("") ?
+					Charset.defaultCharset() : Charset.forName(annotation.encoding());
+
+			final boolean isAutoStart = annotation.autoStart();
+
+			hiveShellBuilder.setScriptsUnderTest(scripts, charset);
+
+			return new HiveShellField() {
+				@Override
+				public void setShell(HiveShell shell) {
+					ReflectionUtils.setStaticField(testClass, field.getName(), shell);
+				}
+
+				@Override
+				public boolean isAutoStart() {
+					return isAutoStart;
+				}
+			};
+		} catch (Throwable t) {
+			throw new IllegalArgumentException("Failed to init field annotated with @HiveSQL: " + t.getMessage(), t);
+		}
+	}
+
+	private void loadAnnotatedSetupScripts(Class testClass, HiveShellBuilder hiveShellBuilder) {
+		Set<Field> setupScriptFields = ReflectionUtils.getAllFields(testClass, withAnnotation(HiveSetupScript.class));
+		for (Field setupScriptField : setupScriptFields) {
+			if (ReflectionUtils.isOfType(setupScriptField, String.class)) {
+				String script = ReflectionUtils.getStaticFieldValue(testClass, setupScriptField.getName(), String.class);
+				hiveShellBuilder.addSetupScript(script);
+			} else if (ReflectionUtils.isOfType(setupScriptField, File.class) ||
+					ReflectionUtils.isOfType(setupScriptField, Path.class)) {
+				Path path = getMandatoryPathFromField(testClass, setupScriptField);
+				hiveShellBuilder.addSetupScript(readAll(path));
+			} else {
+				throw new IllegalArgumentException(
+						"Field annotated with @HiveSetupScript currently only supports type String, File and Path");
+			}
+		}
+	}
+
+	private static String readAll(Path path) {
+		try {
+			return new String(Files.readAllBytes(path), StandardCharsets.UTF_8);
+		} catch (IOException e) {
+			throw new IllegalStateException("Unable to read " + path + ": " + e.getMessage(), e);
+		}
+	}
+
+	private void loadAnnotatedResources(Class testClass, HiveShellBuilder workFlowBuilder) throws IOException {
+		Set<Field> fields = ReflectionUtils.getAllFields(testClass, withAnnotation(HiveResource.class));
+
+		for (Field resourceField : fields) {
+
+			HiveResource annotation = resourceField.getAnnotation(HiveResource.class);
+			String targetFile = annotation.targetFile();
+
+			if (ReflectionUtils.isOfType(resourceField, String.class)) {
+				String data = ReflectionUtils.getStaticFieldValue(testClass, resourceField.getName(), String.class);
+				workFlowBuilder.addResource(targetFile, data);
+			} else if (ReflectionUtils.isOfType(resourceField, File.class) ||
+					ReflectionUtils.isOfType(resourceField, Path.class)) {
+				Path dataFile = getMandatoryPathFromField(testClass, resourceField);
+				workFlowBuilder.addResource(targetFile, dataFile);
+			} else {
+				throw new IllegalArgumentException(
+						"Fields annotated with @HiveResource currently only supports field type String, File or Path");
+			}
+		}
+	}
+
+	private Path getMandatoryPathFromField(Class testClass, Field resourceField) {
+		Path path;
+		if (ReflectionUtils.isOfType(resourceField, File.class)) {
+			File dataFile = ReflectionUtils.getStaticFieldValue(testClass, resourceField.getName(), File.class);
+			path = Paths.get(dataFile.toURI());
+		} else if (ReflectionUtils.isOfType(resourceField, Path.class)) {
+			path = ReflectionUtils.getStaticFieldValue(testClass, resourceField.getName(), Path.class);
+		} else {
+			throw new IllegalArgumentException(
+					"Only Path or File type is allowed on annotated field " + resourceField);
+		}
+
+		Preconditions.checkArgument(Files.exists(path), "File %s does not exist", path);
+		return path;
+	}
+
+	private void loadAnnotatedProperties(Class testClass, HiveShellBuilder workFlowBuilder) {
+		for (Field hivePropertyField : ReflectionUtils.getAllFields(testClass, withAnnotation(HiveProperties.class))) {
+			Preconditions.checkState(ReflectionUtils.isOfType(hivePropertyField, Map.class),
+					"Field annotated with @HiveProperties should be of type Map<String, String>");
+			workFlowBuilder.putAllProperties(
+					ReflectionUtils.getStaticFieldValue(testClass, hivePropertyField.getName(), Map.class));
+		}
+	}
+
+	/**
+	 * Used as a handle for the HiveShell field in the test case so that we may set it once the
+	 * HiveShell has been instantiated.
+	 */
+	interface HiveShellField {
+		void setShell(HiveShell shell);
+
+		boolean isAutoStart();
+	}
+
+	/**
+	 * Launches HMS process and returns a Future representing that process.
+	 */
+	private static Future<Void> startHMS(HiveServerContext context, int port) throws Exception {
+		context.init();
+		HiveConf outsideConf = context.getHiveConf();
+		List<String> args = new ArrayList<>();
+		String javaHome = System.getProperty("java.home");
+		args.add(Joiner.on(File.separator).join(javaHome, "bin", "java"));
+		// set classpath
+		List<String> cpElements = new ArrayList<>();
+		String classpath = System.getProperty("java.class.path");
+		for (String path : Splitter.on(File.pathSeparator).split(classpath)) {
+			cpElements.add(ParseUtil.encodePath(path));
+		}
+		args.add("-cp");
+		args.add(String.join(File.pathSeparator, cpElements));
+
+		// set sys properties
+		// TODO: generate hive-site.xml at runtime?
+		args.add(hiveCmdLineConfig(METASTOREWAREHOUSE.varname, outsideConf.getVar(METASTOREWAREHOUSE)));
+		args.add(hiveCmdLineConfig(SCRATCHDIR.varname, outsideConf.getVar(SCRATCHDIR)));
+		args.add(hiveCmdLineConfig(LOCALSCRATCHDIR.varname, outsideConf.getVar(LOCALSCRATCHDIR)));
+		args.add(hiveCmdLineConfig(HIVEHISTORYFILELOC.varname, outsideConf.getVar(HIVEHISTORYFILELOC)));
+		args.add(hiveCmdLineConfig(HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS.varname,
+				String.valueOf(outsideConf.getBoolVar(HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS))));
+		args.add(hiveCmdLineConfig("hadoop.tmp.dir", outsideConf.get("hadoop.tmp.dir")));
+		args.add(hiveCmdLineConfig("test.log.dir", outsideConf.get("test.log.dir")));
+		String metaStorageUrl = "jdbc:derby:memory:" + UUID.randomUUID().toString();
+		args.add(hiveCmdLineConfig(METASTORECONNECTURLKEY.varname, metaStorageUrl + ";create=true"));
+		// config derby.log file
+		File derbyLog = File.createTempFile("derby", ".log");
+		derbyLog.deleteOnExit();
+		args.add(hiveCmdLineConfig("derby.stream.error.file", derbyLog.getAbsolutePath()));
+
+		args.add(HiveMetaStore.class.getCanonicalName());
+		args.add("-p");
+		args.add(String.valueOf(port));
+
+		ProcessBuilder builder = new ProcessBuilder(args);
+		Process process = builder.start();
+		Thread inLogger = new Thread(new LogRedirect(process.getInputStream(), LOGGER));
+		Thread errLogger = new Thread(new LogRedirect(process.getErrorStream(), LOGGER));
+		inLogger.setDaemon(true);
+		inLogger.setName("HMS-IN-Logger");
+		errLogger.setDaemon(true);
+		errLogger.setName("HMS-ERR-Logger");
+		inLogger.start();
+		errLogger.start();
+
+		FutureTask<Void> res = new FutureTask<>(() -> {
+			try {
+				int r = process.waitFor();
+				inLogger.join();
+				errLogger.join();
+				if (r != 0) {
+					throw new RuntimeException("HMS process exited with " + r);
+				}
+			} catch (InterruptedException e) {
+				LOGGER.info("Shutting down HMS");
+			} finally {
+				if (process.isAlive()) {
+					// give it a chance to terminate gracefully
+					process.destroy();
+					try {
+						process.waitFor(5, TimeUnit.SECONDS);
+					} catch (InterruptedException e) {
+						LOGGER.info("Interrupted waiting for HMS to shut down, killing it forcibly");
+					}
+					process.destroyForcibly();
+				}
+			}
+		}, null);
+		Thread thread = new Thread(res);
+		thread.setName("HMS-Watcher");
+		// we need the watcher thread to kill HMS, don't make it daemon
+		thread.setDaemon(false);
+		thread.start();
+		waitForHMSStart(port);
+		return res;
+	}
+
+	private static void waitForHMSStart(int port) throws Exception {
+		final long deadline = System.currentTimeMillis() + HMS_START_TIMEOUT.toMillis();
+		while (System.currentTimeMillis() < deadline) {
+			try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", port))) {
+				LOGGER.info("HMS started at port {}", port);
+				return;
+			} catch (ConnectException e) {
+				LOGGER.info("Waiting for HMS to start...");
+				Thread.sleep(1000);
+			}
+		}
+		throw new java.util.concurrent.TimeoutException("Timeout waiting for HMS to start");
+	}
+
+	private static String hiveCmdLineConfig(String name, String val) {
+		return String.format("-D%s=%s", name, val);
+	}
+
+	private static class LogRedirect implements Runnable {
+		private final InputStream inputStream;
+		private final Logger logger;
+
+		LogRedirect(InputStream inputStream, Logger logger) {
+			this.inputStream = inputStream;
+			this.logger = logger;
+		}
+
+		@Override
+		public void run() {
+			try {
+				new BufferedReader(new InputStreamReader(inputStream)).lines().forEach(logger::info);
+			} catch (Exception e) {
+				logger.error(Throwables.getStackTraceAsString(e));
+			}
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/FlinkStandaloneHiveServerContext.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/FlinkStandaloneHiveServerContext.java
new file mode 100644
index 0000000..62ccaae
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/FlinkStandaloneHiveServerContext.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import com.klarna.hiverunner.HiveServerContext;
+import com.klarna.hiverunner.config.HiveRunnerConfig;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HADOOPBIN;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVECONVERTJOIN;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEHISTORYFILELOC;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEMETADATAONLYQUERIES;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEOPTINDEXFILTER;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESKEWJOIN;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESTATSAUTOGATHER;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_CBO_ENABLED;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.LOCALSCRATCHDIR;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_VALIDATE_COLUMNS;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_VALIDATE_CONSTRAINTS;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_VALIDATE_TABLES;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.SCRATCHDIR;
+
+/**
+ * HiveServerContext used by FlinkStandaloneHiveRunner.
+ */
+public class FlinkStandaloneHiveServerContext implements HiveServerContext {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(FlinkStandaloneHiveServerContext.class);
+
+	private HiveConf hiveConf = new HiveConf();
+
+	private final TemporaryFolder basedir;
+	private final HiveRunnerConfig hiveRunnerConfig;
+	private boolean inited = false;
+	private final int hmsPort;
+
+	FlinkStandaloneHiveServerContext(TemporaryFolder basedir, HiveRunnerConfig hiveRunnerConfig, int hmsPort) {
+		this.basedir = basedir;
+		this.hiveRunnerConfig = hiveRunnerConfig;
+		this.hmsPort = hmsPort;
+	}
+
+	private String toHmsURI() {
+		return "thrift://localhost:" + hmsPort;
+	}
+
+	@Override
+	public final void init() {
+		if (!inited) {
+
+			configureMiscHiveSettings(hiveConf);
+
+			configureMetaStore(hiveConf);
+
+			configureMrExecutionEngine(hiveConf);
+
+			configureJavaSecurityRealm(hiveConf);
+
+			configureSupportConcurrency(hiveConf);
+
+			configureFileSystem(basedir, hiveConf);
+
+			configureAssertionStatus(hiveConf);
+
+			overrideHiveConf(hiveConf);
+		}
+		inited = true;
+	}
+
+	private void configureMiscHiveSettings(HiveConf hiveConf) {
+		hiveConf.setBoolVar(HIVESTATSAUTOGATHER, false);
+
+		// Turn off CBO so we don't depend on calcite
+		hiveConf.setBoolVar(HIVE_CBO_ENABLED, false);
+
+		// Disable to get rid of clean up exception when stopping the Session.
+		hiveConf.setBoolVar(HIVE_SERVER2_LOGGING_OPERATION_ENABLED, false);
+
+		hiveConf.setVar(HADOOPBIN, "NO_BIN!");
+	}
+
+	private void overrideHiveConf(HiveConf hiveConf) {
+		for (Map.Entry<String, String> hiveConfEntry : hiveRunnerConfig.getHiveConfSystemOverride().entrySet()) {
+			hiveConf.set(hiveConfEntry.getKey(), hiveConfEntry.getValue());
+		}
+	}
+
+	private void configureMrExecutionEngine(HiveConf conf) {
+
+		/*
+		 * Switch off all optimizers otherwise we didn't
+		 * manage to contain the map reduction within this JVM.
+		 */
+		conf.setBoolVar(HIVE_INFER_BUCKET_SORT, false);
+		conf.setBoolVar(HIVEMETADATAONLYQUERIES, false);
+		conf.setBoolVar(HIVEOPTINDEXFILTER, false);
+		conf.setBoolVar(HIVECONVERTJOIN, false);
+		conf.setBoolVar(HIVESKEWJOIN, false);
+
+		// Defaults to a 1000 millis sleep in. We can speed up the tests a bit by setting this to 1 millis instead.
+		// org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper.
+		hiveConf.setLongVar(HiveConf.ConfVars.HIVECOUNTERSPULLINTERVAL, 1L);
+
+		hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN, true);
+	}
+
+	private void configureJavaSecurityRealm(HiveConf hiveConf) {
+		// These three properties gets rid of: 'Unable to load realm info from SCDynamicStore'
+		// which seems to have a timeout of about 5 secs.
+		System.setProperty("java.security.krb5.realm", "");
+		System.setProperty("java.security.krb5.kdc", "");
+		System.setProperty("java.security.krb5.conf", "/dev/null");
+	}
+
+	private void configureAssertionStatus(HiveConf conf) {
+		ClassLoader.getSystemClassLoader().setPackageAssertionStatus(
+				"org.apache.hadoop.hive.serde2.objectinspector",
+				false);
+	}
+
+	private void configureSupportConcurrency(HiveConf conf) {
+		hiveConf.setBoolVar(HIVE_SUPPORT_CONCURRENCY, false);
+	}
+
+	private void configureMetaStore(HiveConf conf) {
+
+		String jdbcDriver = org.apache.derby.jdbc.EmbeddedDriver.class.getName();
+		try {
+			Class.forName(jdbcDriver);
+		} catch (ClassNotFoundException e) {
+			throw new RuntimeException(e);
+		}
+
+		// Set the Hive Metastore DB driver
+		hiveConf.set("datanucleus.schema.autoCreateAll", "true");
+		hiveConf.set("hive.metastore.schema.verification", "false");
+		hiveConf.set("hive.metastore.uris", toHmsURI());
+		// No pooling needed. This will save us a lot of threads
+		hiveConf.set("datanucleus.connectionPoolingType", "None");
+
+		conf.setBoolVar(METASTORE_VALIDATE_CONSTRAINTS, true);
+		conf.setBoolVar(METASTORE_VALIDATE_COLUMNS, true);
+		conf.setBoolVar(METASTORE_VALIDATE_TABLES, true);
+
+		// disable authorization to avoid NPE
+		conf.set(HIVE_AUTHORIZATION_MANAGER.varname,
+				"org.apache.hive.hcatalog.storagehandler.DummyHCatAuthProvider");
+	}
+
+	private void configureFileSystem(TemporaryFolder basedir, HiveConf conf) {
+
+		createAndSetFolderProperty(METASTOREWAREHOUSE, "warehouse", conf, basedir);
+		createAndSetFolderProperty(SCRATCHDIR, "scratchdir", conf, basedir);
+		createAndSetFolderProperty(LOCALSCRATCHDIR, "localscratchdir", conf, basedir);
+		createAndSetFolderProperty(HIVEHISTORYFILELOC, "tmp", conf, basedir);
+
+		conf.setBoolVar(HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS, true);
+
+		createAndSetFolderProperty("hadoop.tmp.dir", "hadooptmp", conf, basedir);
+		createAndSetFolderProperty("test.log.dir", "logs", conf, basedir);
+	}
+
+	private File newFolder(TemporaryFolder basedir, String folder) {
+		try {
+			File newFolder = basedir.newFolder(folder);
+			FileUtil.setPermission(newFolder, FsPermission.getDirDefault());
+			return newFolder;
+		} catch (IOException e) {
+			throw new IllegalStateException("Failed to create tmp dir: " + e.getMessage(), e);
+		}
+	}
+
+	public HiveConf getHiveConf() {
+		return hiveConf;
+	}
+
+	@Override
+	public TemporaryFolder getBaseDir() {
+		return basedir;
+	}
+
+	private void createAndSetFolderProperty(
+			HiveConf.ConfVars var, String folder, HiveConf conf,
+			TemporaryFolder basedir) {
+		conf.setVar(var, newFolder(basedir, folder).getAbsolutePath());
+	}
+
+	private void createAndSetFolderProperty(String key, String folder, HiveConf conf, TemporaryFolder basedir) {
+		conf.set(key, newFolder(basedir, folder).getAbsolutePath());
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShim.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShim.java
new file mode 100644
index 0000000..32742d8
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShim.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import com.klarna.hiverunner.builder.HiveShellBuilder;
+import com.klarna.hiverunner.config.HiveRunnerConfig;
+
+/**
+ * Shim layer for hive runner dependency.
+ */
+public interface HiveRunnerShim {
+
+	/**
+	 * Sets CommandShellEmulation for HiveShellBuilder.
+	 */
+	void setCommandShellEmulation(HiveShellBuilder builder, HiveRunnerConfig config) throws Exception;
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimLoader.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimLoader.java
new file mode 100644
index 0000000..05e6430
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimLoader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Loader to load proper HiveRunnerShim.
+ */
+public class HiveRunnerShimLoader {
+
+	private static final Map<String, HiveRunnerShim> hiveRunnerShims = new ConcurrentHashMap<>();
+
+	private HiveRunnerShimLoader() {
+	}
+
+	public static HiveRunnerShim load() {
+		String hiveVersion = HiveShimLoader.getHiveVersion();
+		return hiveRunnerShims.computeIfAbsent(hiveVersion, v -> {
+			switch (v) {
+				case HiveShimLoader.HIVE_V1_VERSION_NAME:
+					return new HiveRunnerShimV3();
+				case HiveShimLoader.HIVE_V2_VERSION_NAME:
+					return new HiveRunnerShimV4();
+				default:
+					throw new RuntimeException("Unsupported Hive version " + v);
+			}
+		});
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimV3.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimV3.java
new file mode 100644
index 0000000..c405c16
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimV3.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import com.klarna.hiverunner.builder.HiveShellBuilder;
+import com.klarna.hiverunner.config.HiveRunnerConfig;
+
+import java.lang.reflect.Method;
+
+/**
+ * Shim for hive runner 3.x.
+ */
+public class HiveRunnerShimV3 implements HiveRunnerShim {
+
+	@Override
+	public void setCommandShellEmulation(HiveShellBuilder builder, HiveRunnerConfig config) throws Exception {
+		Method method = HiveRunnerConfig.class.getDeclaredMethod("getCommandShellEmulation");
+		Object emulation = method.invoke(config);
+		Class emulationClz = Class.forName("com.klarna.hiverunner.CommandShellEmulation");
+		method = HiveShellBuilder.class.getDeclaredMethod("setCommandShellEmulation", emulationClz);
+		method.invoke(builder, emulation);
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimV4.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimV4.java
new file mode 100644
index 0000000..625867a
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimV4.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import com.klarna.hiverunner.builder.HiveShellBuilder;
+import com.klarna.hiverunner.config.HiveRunnerConfig;
+
+import java.lang.reflect.Method;
+
+/**
+ * Shim for hive runner 4.x.
+ */
+public class HiveRunnerShimV4 implements HiveRunnerShim {
+
+	@Override
+	public void setCommandShellEmulation(HiveShellBuilder builder, HiveRunnerConfig config) throws Exception {
+		Method method = config.getClass().getDeclaredMethod("getCommandShellEmulator");
+		Object emulator = method.invoke(config);
+		Class emulatorClz = Class.forName("com.klarna.hiverunner.sql.cli.CommandShellEmulator");
+		method = builder.getClass().getDeclaredMethod("setCommandShellEmulation", emulatorClz);
+		method.invoke(builder, emulator);
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index e6505df..44124d9 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -23,51 +23,48 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.HiveCatalogConfig;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Tests {@link HiveTableSink}.
  */
+@RunWith(FlinkStandaloneHiveRunner.class)
 public class HiveTableSinkTest {
 
+	@HiveSQL(files = {})
+	private static HiveShell hiveShell;
+
 	private static HiveCatalog hiveCatalog;
 	private static HiveConf hiveConf;
 
 	@BeforeClass
 	public static void createCatalog() throws IOException {
-		hiveConf = HiveTestUtils.createHiveConf();
+		hiveConf = hiveShell.getHiveConf();
 		hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
 		hiveCatalog.open();
 	}
@@ -91,13 +88,17 @@ public class HiveTableSinkTest {
 		List<Row> toWrite = generateRecords(5);
 		tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
 
-		Table hiveTable = hiveCatalog.getHiveTable(tablePath);
 		CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath);
 		tableEnv.registerTableSink("destSink", new HiveTableSink(new JobConf(hiveConf), tablePath, table));
 		tableEnv.sqlQuery("select * from src").insertInto("destSink");
 		execEnv.execute();
 
-		verifyWrittenData(new Path(hiveTable.getSd().getLocation(), "0"), toWrite, 0);
+		List<String> result = hiveShell.executeQuery("select * from " + tblName);
+		assertEquals(toWrite.size(), result.size());
+		for (int i = 0; i < result.size(); i++) {
+			assertEquals(toWrite.get(i).toString().replaceAll(",", "\t"), result.get(i));
+		}
+
 		hiveCatalog.dropTable(tablePath, false);
 	}
 
@@ -113,7 +114,6 @@ public class HiveTableSinkTest {
 		List<Row> toWrite = generateRecords(5);
 		tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
 
-		Table hiveTable = hiveCatalog.getHiveTable(tablePath);
 		CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath);
 		tableEnv.registerTableSink("destSink", new HiveTableSink(new JobConf(hiveConf), tablePath, table));
 		tableEnv.sqlQuery("select * from src").insertInto("destSink");
@@ -121,10 +121,10 @@ public class HiveTableSinkTest {
 
 		List<CatalogPartitionSpec> partitionSpecs = hiveCatalog.listPartitions(tablePath);
 		assertEquals(toWrite.size(), partitionSpecs.size());
-		for (int i = 0; i < toWrite.size(); i++) {
-			CatalogPartition partition = hiveCatalog.getPartition(tablePath, partitionSpecs.get(i));
-			String partitionLocation = partition.getProperties().get(HiveCatalogConfig.PARTITION_LOCATION);
-			verifyWrittenData(new Path(partitionLocation, "0"), Collections.singletonList(toWrite.get(i)), 1);
+
+		List<String> result = hiveShell.executeQuery("select * from " + tblName);
+		for (int i = 0; i < result.size(); i++) {
+			assertEquals(toWrite.get(i).toString().replaceAll(",", "\t"), result.get(i));
 		}
 
 		hiveCatalog.dropTable(tablePath, false);
@@ -163,13 +163,14 @@ public class HiveTableSinkTest {
 		BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
 		tableEnv.registerDataSet("complexSrc", execEnv.fromCollection(toWrite, rowTypeInfo));
 
-		Table hiveTable = hiveCatalog.getHiveTable(tablePath);
 		CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
 		tableEnv.registerTableSink("complexSink", new HiveTableSink(new JobConf(hiveConf), tablePath, catalogTable));
 		tableEnv.sqlQuery("select * from complexSrc").insertInto("complexSink");
 		execEnv.execute();
 
-		verifyWrittenData(new Path(hiveTable.getSd().getLocation(), "0"), Collections.singletonList("1 2 3,1 a 2 b,3 c"));
+		List<String> result = hiveShell.executeQuery("select * from " + tblName);
+		assertEquals(1, result.size());
+		assertEquals("[1,2,3]\t{1:\"a\",2:\"b\"}\t{\"f1\":3,\"f2\":\"c\"}", result.get(0));
 		hiveCatalog.dropTable(tablePath, false);
 
 		// nested complex types
@@ -191,13 +192,14 @@ public class HiveTableSinkTest {
 		toWrite.add(row);
 
 		tableEnv.registerDataSet("nestedSrc", execEnv.fromCollection(toWrite, rowTypeInfo));
-		hiveTable = hiveCatalog.getHiveTable(tablePath);
 		catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
 		tableEnv.registerTableSink("nestedSink", new HiveTableSink(new JobConf(hiveConf), tablePath, catalogTable));
 		tableEnv.sqlQuery("select * from nestedSrc").insertInto("nestedSink");
 		execEnv.execute();
 
-		verifyWrittenData(new Path(hiveTable.getSd().getLocation(), "0"), Collections.singletonList("1 a 2 b 3 c"));
+		result = hiveShell.executeQuery("select * from " + tblName);
+		assertEquals(1, result.size());
+		assertEquals("[{\"f1\":1,\"f2\":\"a\"},{\"f1\":2,\"f2\":\"b\"},{\"f1\":3,\"f2\":\"c\"}]", result.get(0));
 		hiveCatalog.dropTable(tablePath, false);
 	}
 
@@ -240,27 +242,4 @@ public class HiveTableSinkTest {
 		}
 		return res;
 	}
-
-	private void verifyWrittenData(Path outputFile, List<Row> expectedRows, int numPartCols) throws Exception {
-		int[] fields = IntStream.range(0, expectedRows.get(0).getArity() - numPartCols).toArray();
-		List<String> expected = new ArrayList<>(expectedRows.size());
-		for (Row row : expectedRows) {
-			expected.add(Row.project(row, fields).toString());
-		}
-		verifyWrittenData(outputFile, expected);
-	}
-
-	private void verifyWrittenData(Path outputFile, List<String> expected) throws Exception {
-		FileSystem fs = outputFile.getFileSystem(hiveConf);
-		assertTrue(fs.exists(outputFile));
-		try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(outputFile)))) {
-			int numWritten = 0;
-			String line = reader.readLine();
-			while (line != null) {
-				assertEquals(expected.get(numWritten++), line.replaceAll("\u0001", ",").replaceAll("\u0002", " ").replaceAll("\u0003", " "));
-				line = reader.readLine();
-			}
-			assertEquals(expected.size(), numWritten);
-		}
-	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index d0b4312..82e4d8e 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -27,6 +27,10 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * Test utils for Hive connector.
@@ -36,6 +40,10 @@ public class HiveTestUtils {
 	private static final String HIVE_WAREHOUSE_URI_FORMAT = "jdbc:derby:;databaseName=%s;create=true";
 	private static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
 
+	// range of ephemeral ports
+	private static final int MIN_EPH_PORT = 49152;
+	private static final int MAX_EPH_PORT = 61000;
+
 	/**
 	 * Create a HiveCatalog with an embedded Hive Metastore.
 	 */
@@ -70,4 +78,21 @@ public class HiveTestUtils {
 				"Failed to create test HiveConf to HiveCatalog.", e);
 		}
 	}
+
+	// Gets a free port of localhost. Note that this method suffers the "time of check to time of use" race condition.
+	// Use it as best efforts to avoid port conflicts.
+	public static int getFreePort() throws IOException {
+		final int numPorts = MAX_EPH_PORT - MIN_EPH_PORT + 1;
+		int numAttempt = 0;
+		while (numAttempt++ < numPorts) {
+			int p = ThreadLocalRandom.current().nextInt(numPorts) + MIN_EPH_PORT;
+			try (ServerSocket socket = new ServerSocket()) {
+				socket.bind(new InetSocketAddress("localhost", p));
+				return socket.getLocalPort();
+			} catch (BindException e) {
+				// this port is in use, try another one
+			}
+		}
+		throw new RuntimeException("Exhausted all ephemeral ports and didn't find a free one");
+	}
 }