You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/09 16:17:13 UTC
[flink] branch master updated: [FLINK-13090][hive] Test Hive
connector with hive runner
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5d1e64e [FLINK-13090][hive] Test Hive connector with hive runner
5d1e64e is described below
commit 5d1e64eda1c4dc9b3563641d393bf1953e4ac06b
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Jul 4 16:34:55 2019 +0800
[FLINK-13090][hive] Test Hive connector with hive runner
This PR uses hive runner in our hive connector tests.
This closes #8987.
---
flink-connectors/flink-connector-hive/pom.xml | 99 ++++-
.../table/catalog/hive/client/HiveShimLoader.java | 4 +-
.../connectors/hive/FlinkStandaloneHiveRunner.java | 464 +++++++++++++++++++++
.../hive/FlinkStandaloneHiveServerContext.java | 222 ++++++++++
.../batch/connectors/hive/HiveRunnerShim.java | 33 ++
.../connectors/hive/HiveRunnerShimLoader.java | 49 +++
.../batch/connectors/hive/HiveRunnerShimV3.java | 39 ++
.../batch/connectors/hive/HiveRunnerShimV4.java | 39 ++
.../batch/connectors/hive/HiveTableSinkTest.java | 69 ++-
.../flink/table/catalog/hive/HiveTestUtils.java | 25 ++
10 files changed, 992 insertions(+), 51 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 3b549b6..5ceab9d 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -35,6 +35,11 @@ under the License.
<packaging>jar</packaging>
+ <properties>
+ <hiverunner.version>4.0.0</hiverunner.version>
+ <reflections.version>0.9.8</reflections.version>
+ </properties>
+
<dependencies>
<!-- core dependencies -->
@@ -260,10 +265,6 @@ under the License.
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-1.2-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
@@ -373,6 +374,95 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.klarna</groupId>
+ <artifactId>hiverunner</artifactId>
+ <version>${hiverunner.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-jdbc</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-webhcat-java-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-service</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-contrib</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-mapreduce</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- We have 0.9.10 in dependency management but hiverunner requires 0.9.8, so need to explicitly specify it here -->
+ <dependency>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ <version>${reflections.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-service</artifactId>
+ <version>${hive.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-core</artifactId>
+ <version>${hive.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
</dependencies>
<build>
@@ -440,6 +530,7 @@ under the License.
<properties>
<hive.version>1.2.1</hive.version>
<hivemetastore.hadoop.version>2.6.5</hivemetastore.hadoop.version>
+ <hiverunner.version>3.2.1</hiverunner.version>
</properties>
</profile>
</profiles>
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
index 3a7bd5f..771cfc0 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
@@ -32,8 +32,8 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class HiveShimLoader {
- private static final String HIVE_V1_VERSION_NAME = "1.2.1";
- private static final String HIVE_V2_VERSION_NAME = "2.3.4";
+ public static final String HIVE_V1_VERSION_NAME = "1.2.1";
+ public static final String HIVE_V2_VERSION_NAME = "2.3.4";
private static final Map<String, HiveShim> hiveShims = new ConcurrentHashMap<>(2);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/FlinkStandaloneHiveRunner.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/FlinkStandaloneHiveRunner.java
new file mode 100644
index 0000000..1df9904
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/FlinkStandaloneHiveRunner.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
+import org.apache.flink.shaded.guava18.com.google.common.base.Splitter;
+import org.apache.flink.shaded.guava18.com.google.common.base.Throwables;
+import org.apache.flink.shaded.guava18.com.google.common.io.Resources;
+
+import com.klarna.hiverunner.HiveServerContainer;
+import com.klarna.hiverunner.HiveServerContext;
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.HiveShellContainer;
+import com.klarna.hiverunner.annotations.HiveProperties;
+import com.klarna.hiverunner.annotations.HiveResource;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import com.klarna.hiverunner.annotations.HiveSetupScript;
+import com.klarna.hiverunner.builder.HiveShellBuilder;
+import com.klarna.hiverunner.config.HiveRunnerConfig;
+import com.klarna.reflection.ReflectionUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.junit.Ignore;
+import org.junit.internal.AssumptionViolatedException;
+import org.junit.internal.runners.model.EachTestNotifier;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runner.notification.RunNotifier;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.model.FrameworkMethod;
+import org.junit.runners.model.InitializationError;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.net.www.ParseUtil;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.reflect.Field;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEHISTORYFILELOC;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.LOCALSCRATCHDIR;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORECONNECTURLKEY;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.SCRATCHDIR;
+import static org.reflections.ReflectionUtils.withAnnotation;
+
+/**
+ * JUnit 4 runner that runs hive sql on a HiveServer residing in this JVM. No external dependencies needed.
+ * Inspired by StandaloneHiveRunner.java (almost copied), just using local meta store server instead of embedded
+ * hive meta store.
+ */
+public class FlinkStandaloneHiveRunner extends BlockJUnit4ClassRunner {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlinkStandaloneHiveRunner.class);
+ private static final Duration HMS_START_TIMEOUT = Duration.ofSeconds(30);
+ private Future<Void> hmsWatcher;
+ private int hmsPort;
+ private HiveShellContainer container;
+ private HiveRunnerConfig config = new HiveRunnerConfig();
+
+ public FlinkStandaloneHiveRunner(Class<?> clazz) throws InitializationError {
+ super(clazz);
+ }
+
+ @Override
+ protected List<TestRule> classRules() {
+ final TemporaryFolder temporaryFolder = new TemporaryFolder();
+ try {
+ hmsPort = HiveTestUtils.getFreePort();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ HiveServerContext context = new FlinkStandaloneHiveServerContext(temporaryFolder, config, hmsPort);
+ List<TestRule> rules = super.classRules();
+ ExternalResource hms = new ExternalResource() {
+ @Override
+ protected void before() throws Throwable {
+ LOGGER.info("Setting up {} in {}", getName(), temporaryFolder.getRoot().getAbsolutePath());
+ hmsWatcher = startHMS(context, hmsPort);
+ }
+
+ @Override
+ protected void after() {
+ if (hmsWatcher != null) {
+ hmsWatcher.cancel(true);
+ }
+ }
+ };
+ ExternalResource hiveShell = new ExternalResource() {
+ @Override
+ protected void before() throws Throwable {
+ container = createHiveServerContainer(getTestClass().getJavaClass(), context);
+ }
+
+ @Override
+ protected void after() {
+ tearDown();
+ }
+ };
+ rules.add(hiveShell);
+ rules.add(hms);
+ rules.add(temporaryFolder);
+ return rules;
+ }
+
+ @Override
+ protected void runChild(final FrameworkMethod method, RunNotifier notifier) {
+ Description description = describeChild(method);
+ if (method.getAnnotation(Ignore.class) != null) {
+ notifier.fireTestIgnored(description);
+ } else {
+ EachTestNotifier eachNotifier = new EachTestNotifier(notifier, description);
+ eachNotifier.fireTestStarted();
+ try {
+ runTestMethod(method, eachNotifier);
+ } finally {
+ eachNotifier.fireTestFinished();
+ }
+ }
+ }
+
+ /**
+ * Runs a {@link Statement} that represents a leaf (aka atomic) test.
+ */
+ private void runTestMethod(FrameworkMethod method,
+ EachTestNotifier notifier) {
+ Statement statement = methodBlock(method);
+
+ try {
+ statement.evaluate();
+ } catch (AssumptionViolatedException e) {
+ notifier.addFailedAssumption(e);
+ } catch (Throwable e) {
+ notifier.addFailure(e);
+ }
+ }
+
+ private void tearDown() {
+ if (container != null) {
+ LOGGER.info("Tearing down {}", getName());
+ try {
+ container.tearDown();
+ } catch (Throwable e) {
+ LOGGER.warn("Tear down failed: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ /**
+ * Traverses the test class annotations. Will inject a HiveShell in the test case that envelopes the HiveServer.
+ */
+ private HiveShellContainer createHiveServerContainer(final Class testClass, HiveServerContext context)
+ throws Exception {
+
+ final HiveServerContainer hiveServerContainer = new HiveServerContainer(context);
+
+ HiveShellBuilder hiveShellBuilder = new HiveShellBuilder();
+ HiveRunnerShim hiveRunnerShim = HiveRunnerShimLoader.load();
+ hiveRunnerShim.setCommandShellEmulation(hiveShellBuilder, config);
+
+ HiveShellField shellSetter = loadScriptsUnderTest(testClass, hiveShellBuilder);
+
+ hiveShellBuilder.setHiveServerContainer(hiveServerContainer);
+
+ loadAnnotatedResources(testClass, hiveShellBuilder);
+
+ loadAnnotatedProperties(testClass, hiveShellBuilder);
+
+ loadAnnotatedSetupScripts(testClass, hiveShellBuilder);
+
+ // Build shell
+ final HiveShellContainer shell = hiveShellBuilder.buildShell();
+
+ // Set shell
+ shellSetter.setShell(shell);
+
+ if (shellSetter.isAutoStart()) {
+ shell.start();
+ }
+
+ return shell;
+ }
+
+ private HiveShellField loadScriptsUnderTest(final Class testClass, HiveShellBuilder hiveShellBuilder) {
+ try {
+ Set<Field> fields = ReflectionUtils.getAllFields(testClass, withAnnotation(HiveSQL.class));
+
+ Preconditions.checkState(fields.size() == 1, "Exactly one field should to be annotated with @HiveSQL");
+
+ final Field field = fields.iterator().next();
+ List<Path> scripts = new ArrayList<>();
+ HiveSQL annotation = field.getAnnotation(HiveSQL.class);
+ for (String scriptFilePath : annotation.files()) {
+ Path file = Paths.get(Resources.getResource(scriptFilePath).toURI());
+ Preconditions.checkState(Files.exists(file), "File " + file + " does not exist");
+ scripts.add(file);
+ }
+
+ Charset charset = annotation.encoding().equals("") ?
+ Charset.defaultCharset() : Charset.forName(annotation.encoding());
+
+ final boolean isAutoStart = annotation.autoStart();
+
+ hiveShellBuilder.setScriptsUnderTest(scripts, charset);
+
+ return new HiveShellField() {
+ @Override
+ public void setShell(HiveShell shell) {
+ ReflectionUtils.setStaticField(testClass, field.getName(), shell);
+ }
+
+ @Override
+ public boolean isAutoStart() {
+ return isAutoStart;
+ }
+ };
+ } catch (Throwable t) {
+ throw new IllegalArgumentException("Failed to init field annotated with @HiveSQL: " + t.getMessage(), t);
+ }
+ }
+
+ private void loadAnnotatedSetupScripts(Class testClass, HiveShellBuilder hiveShellBuilder) {
+ Set<Field> setupScriptFields = ReflectionUtils.getAllFields(testClass, withAnnotation(HiveSetupScript.class));
+ for (Field setupScriptField : setupScriptFields) {
+ if (ReflectionUtils.isOfType(setupScriptField, String.class)) {
+ String script = ReflectionUtils.getStaticFieldValue(testClass, setupScriptField.getName(), String.class);
+ hiveShellBuilder.addSetupScript(script);
+ } else if (ReflectionUtils.isOfType(setupScriptField, File.class) ||
+ ReflectionUtils.isOfType(setupScriptField, Path.class)) {
+ Path path = getMandatoryPathFromField(testClass, setupScriptField);
+ hiveShellBuilder.addSetupScript(readAll(path));
+ } else {
+ throw new IllegalArgumentException(
+ "Field annotated with @HiveSetupScript currently only supports type String, File and Path");
+ }
+ }
+ }
+
+ private static String readAll(Path path) {
+ try {
+ return new String(Files.readAllBytes(path), StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to read " + path + ": " + e.getMessage(), e);
+ }
+ }
+
+ private void loadAnnotatedResources(Class testClass, HiveShellBuilder workFlowBuilder) throws IOException {
+ Set<Field> fields = ReflectionUtils.getAllFields(testClass, withAnnotation(HiveResource.class));
+
+ for (Field resourceField : fields) {
+
+ HiveResource annotation = resourceField.getAnnotation(HiveResource.class);
+ String targetFile = annotation.targetFile();
+
+ if (ReflectionUtils.isOfType(resourceField, String.class)) {
+ String data = ReflectionUtils.getStaticFieldValue(testClass, resourceField.getName(), String.class);
+ workFlowBuilder.addResource(targetFile, data);
+ } else if (ReflectionUtils.isOfType(resourceField, File.class) ||
+ ReflectionUtils.isOfType(resourceField, Path.class)) {
+ Path dataFile = getMandatoryPathFromField(testClass, resourceField);
+ workFlowBuilder.addResource(targetFile, dataFile);
+ } else {
+ throw new IllegalArgumentException(
+ "Fields annotated with @HiveResource currently only supports field type String, File or Path");
+ }
+ }
+ }
+
+ private Path getMandatoryPathFromField(Class testClass, Field resourceField) {
+ Path path;
+ if (ReflectionUtils.isOfType(resourceField, File.class)) {
+ File dataFile = ReflectionUtils.getStaticFieldValue(testClass, resourceField.getName(), File.class);
+ path = Paths.get(dataFile.toURI());
+ } else if (ReflectionUtils.isOfType(resourceField, Path.class)) {
+ path = ReflectionUtils.getStaticFieldValue(testClass, resourceField.getName(), Path.class);
+ } else {
+ throw new IllegalArgumentException(
+ "Only Path or File type is allowed on annotated field " + resourceField);
+ }
+
+ Preconditions.checkArgument(Files.exists(path), "File %s does not exist", path);
+ return path;
+ }
+
+ private void loadAnnotatedProperties(Class testClass, HiveShellBuilder workFlowBuilder) {
+ for (Field hivePropertyField : ReflectionUtils.getAllFields(testClass, withAnnotation(HiveProperties.class))) {
+ Preconditions.checkState(ReflectionUtils.isOfType(hivePropertyField, Map.class),
+ "Field annotated with @HiveProperties should be of type Map<String, String>");
+ workFlowBuilder.putAllProperties(
+ ReflectionUtils.getStaticFieldValue(testClass, hivePropertyField.getName(), Map.class));
+ }
+ }
+
+ /**
+ * Used as a handle for the HiveShell field in the test case so that we may set it once the
+ * HiveShell has been instantiated.
+ */
+ interface HiveShellField {
+ void setShell(HiveShell shell);
+
+ boolean isAutoStart();
+ }
+
+ /**
+ * Launches HMS process and returns a Future representing that process.
+ */
+ private static Future<Void> startHMS(HiveServerContext context, int port) throws Exception {
+ context.init();
+ HiveConf outsideConf = context.getHiveConf();
+ List<String> args = new ArrayList<>();
+ String javaHome = System.getProperty("java.home");
+ args.add(Joiner.on(File.separator).join(javaHome, "bin", "java"));
+ // set classpath
+ List<String> cpElements = new ArrayList<>();
+ String classpath = System.getProperty("java.class.path");
+ for (String path : Splitter.on(File.pathSeparator).split(classpath)) {
+ cpElements.add(ParseUtil.encodePath(path));
+ }
+ args.add("-cp");
+ args.add(String.join(File.pathSeparator, cpElements));
+
+ // set sys properties
+ // TODO: generate hive-site.xml at runtime?
+ args.add(hiveCmdLineConfig(METASTOREWAREHOUSE.varname, outsideConf.getVar(METASTOREWAREHOUSE)));
+ args.add(hiveCmdLineConfig(SCRATCHDIR.varname, outsideConf.getVar(SCRATCHDIR)));
+ args.add(hiveCmdLineConfig(LOCALSCRATCHDIR.varname, outsideConf.getVar(LOCALSCRATCHDIR)));
+ args.add(hiveCmdLineConfig(HIVEHISTORYFILELOC.varname, outsideConf.getVar(HIVEHISTORYFILELOC)));
+ args.add(hiveCmdLineConfig(HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS.varname,
+ String.valueOf(outsideConf.getBoolVar(HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS))));
+ args.add(hiveCmdLineConfig("hadoop.tmp.dir", outsideConf.get("hadoop.tmp.dir")));
+ args.add(hiveCmdLineConfig("test.log.dir", outsideConf.get("test.log.dir")));
+ String metaStorageUrl = "jdbc:derby:memory:" + UUID.randomUUID().toString();
+ args.add(hiveCmdLineConfig(METASTORECONNECTURLKEY.varname, metaStorageUrl + ";create=true"));
+ // config derby.log file
+ File derbyLog = File.createTempFile("derby", ".log");
+ derbyLog.deleteOnExit();
+ args.add(hiveCmdLineConfig("derby.stream.error.file", derbyLog.getAbsolutePath()));
+
+ args.add(HiveMetaStore.class.getCanonicalName());
+ args.add("-p");
+ args.add(String.valueOf(port));
+
+ ProcessBuilder builder = new ProcessBuilder(args);
+ Process process = builder.start();
+ Thread inLogger = new Thread(new LogRedirect(process.getInputStream(), LOGGER));
+ Thread errLogger = new Thread(new LogRedirect(process.getErrorStream(), LOGGER));
+ inLogger.setDaemon(true);
+ inLogger.setName("HMS-IN-Logger");
+ errLogger.setDaemon(true);
+ errLogger.setName("HMS-ERR-Logger");
+ inLogger.start();
+ errLogger.start();
+
+ FutureTask<Void> res = new FutureTask<>(() -> {
+ try {
+ int r = process.waitFor();
+ inLogger.join();
+ errLogger.join();
+ if (r != 0) {
+ throw new RuntimeException("HMS process exited with " + r);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.info("Shutting down HMS");
+ } finally {
+ if (process.isAlive()) {
+ // give it a chance to terminate gracefully
+ process.destroy();
+ try {
+ process.waitFor(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.info("Interrupted waiting for HMS to shut down, killing it forcibly");
+ }
+ process.destroyForcibly();
+ }
+ }
+ }, null);
+ Thread thread = new Thread(res);
+ thread.setName("HMS-Watcher");
+ // we need the watcher thread to kill HMS, don't make it daemon
+ thread.setDaemon(false);
+ thread.start();
+ waitForHMSStart(port);
+ return res;
+ }
+
+ private static void waitForHMSStart(int port) throws Exception {
+ final long deadline = System.currentTimeMillis() + HMS_START_TIMEOUT.toMillis();
+ while (System.currentTimeMillis() < deadline) {
+ try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", port))) {
+ LOGGER.info("HMS started at port {}", port);
+ return;
+ } catch (ConnectException e) {
+ LOGGER.info("Waiting for HMS to start...");
+ Thread.sleep(1000);
+ }
+ }
+ throw new java.util.concurrent.TimeoutException("Timeout waiting for HMS to start");
+ }
+
+ private static String hiveCmdLineConfig(String name, String val) {
+ return String.format("-D%s=%s", name, val);
+ }
+
+ private static class LogRedirect implements Runnable {
+ private final InputStream inputStream;
+ private final Logger logger;
+
+ LogRedirect(InputStream inputStream, Logger logger) {
+ this.inputStream = inputStream;
+ this.logger = logger;
+ }
+
+ @Override
+ public void run() {
+ try {
+ new BufferedReader(new InputStreamReader(inputStream)).lines().forEach(logger::info);
+ } catch (Exception e) {
+ logger.error(Throwables.getStackTraceAsString(e));
+ }
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/FlinkStandaloneHiveServerContext.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/FlinkStandaloneHiveServerContext.java
new file mode 100644
index 0000000..62ccaae
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/FlinkStandaloneHiveServerContext.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import com.klarna.hiverunner.HiveServerContext;
+import com.klarna.hiverunner.config.HiveRunnerConfig;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HADOOPBIN;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVECONVERTJOIN;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEHISTORYFILELOC;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEMETADATAONLYQUERIES;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEOPTINDEXFILTER;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESKEWJOIN;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESTATSAUTOGATHER;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_CBO_ENABLED;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.LOCALSCRATCHDIR;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_VALIDATE_COLUMNS;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_VALIDATE_CONSTRAINTS;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_VALIDATE_TABLES;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.SCRATCHDIR;
+
+/**
+ * HiveServerContext used by FlinkStandaloneHiveRunner.
+ */
+public class FlinkStandaloneHiveServerContext implements HiveServerContext {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlinkStandaloneHiveServerContext.class);
+
+ private HiveConf hiveConf = new HiveConf();
+
+ private final TemporaryFolder basedir;
+ private final HiveRunnerConfig hiveRunnerConfig;
+ private boolean inited = false;
+ private final int hmsPort;
+
+ FlinkStandaloneHiveServerContext(TemporaryFolder basedir, HiveRunnerConfig hiveRunnerConfig, int hmsPort) {
+ this.basedir = basedir;
+ this.hiveRunnerConfig = hiveRunnerConfig;
+ this.hmsPort = hmsPort;
+ }
+
+ private String toHmsURI() {
+ return "thrift://localhost:" + hmsPort;
+ }
+
+ @Override
+ public final void init() {
+ if (!inited) {
+
+ configureMiscHiveSettings(hiveConf);
+
+ configureMetaStore(hiveConf);
+
+ configureMrExecutionEngine(hiveConf);
+
+ configureJavaSecurityRealm(hiveConf);
+
+ configureSupportConcurrency(hiveConf);
+
+ configureFileSystem(basedir, hiveConf);
+
+ configureAssertionStatus(hiveConf);
+
+ overrideHiveConf(hiveConf);
+ }
+ inited = true;
+ }
+
+ private void configureMiscHiveSettings(HiveConf hiveConf) {
+ hiveConf.setBoolVar(HIVESTATSAUTOGATHER, false);
+
+ // Turn off CBO so we don't depend on calcite
+ hiveConf.setBoolVar(HIVE_CBO_ENABLED, false);
+
+ // Disable to get rid of clean up exception when stopping the Session.
+ hiveConf.setBoolVar(HIVE_SERVER2_LOGGING_OPERATION_ENABLED, false);
+
+ hiveConf.setVar(HADOOPBIN, "NO_BIN!");
+ }
+
+ private void overrideHiveConf(HiveConf hiveConf) {
+ for (Map.Entry<String, String> hiveConfEntry : hiveRunnerConfig.getHiveConfSystemOverride().entrySet()) {
+ hiveConf.set(hiveConfEntry.getKey(), hiveConfEntry.getValue());
+ }
+ }
+
+ private void configureMrExecutionEngine(HiveConf conf) {
+
+ /*
+ * Switch off all optimizers otherwise we didn't
+ * manage to contain the map reduction within this JVM.
+ */
+ conf.setBoolVar(HIVE_INFER_BUCKET_SORT, false);
+ conf.setBoolVar(HIVEMETADATAONLYQUERIES, false);
+ conf.setBoolVar(HIVEOPTINDEXFILTER, false);
+ conf.setBoolVar(HIVECONVERTJOIN, false);
+ conf.setBoolVar(HIVESKEWJOIN, false);
+
+ // Defaults to a 1000 millis sleep in. We can speed up the tests a bit by setting this to 1 millis instead.
+ // org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper.
+ hiveConf.setLongVar(HiveConf.ConfVars.HIVECOUNTERSPULLINTERVAL, 1L);
+
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN, true);
+ }
+
+ private void configureJavaSecurityRealm(HiveConf hiveConf) {
+ // These three properties gets rid of: 'Unable to load realm info from SCDynamicStore'
+ // which seems to have a timeout of about 5 secs.
+ System.setProperty("java.security.krb5.realm", "");
+ System.setProperty("java.security.krb5.kdc", "");
+ System.setProperty("java.security.krb5.conf", "/dev/null");
+ }
+
+ private void configureAssertionStatus(HiveConf conf) {
+ ClassLoader.getSystemClassLoader().setPackageAssertionStatus(
+ "org.apache.hadoop.hive.serde2.objectinspector",
+ false);
+ }
+
+ private void configureSupportConcurrency(HiveConf conf) {
+ hiveConf.setBoolVar(HIVE_SUPPORT_CONCURRENCY, false);
+ }
+
+ private void configureMetaStore(HiveConf conf) {
+
+ String jdbcDriver = org.apache.derby.jdbc.EmbeddedDriver.class.getName();
+ try {
+ Class.forName(jdbcDriver);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ // Set the Hive Metastore DB driver
+ hiveConf.set("datanucleus.schema.autoCreateAll", "true");
+ hiveConf.set("hive.metastore.schema.verification", "false");
+ hiveConf.set("hive.metastore.uris", toHmsURI());
+ // No pooling needed. This will save us a lot of threads
+ hiveConf.set("datanucleus.connectionPoolingType", "None");
+
+ conf.setBoolVar(METASTORE_VALIDATE_CONSTRAINTS, true);
+ conf.setBoolVar(METASTORE_VALIDATE_COLUMNS, true);
+ conf.setBoolVar(METASTORE_VALIDATE_TABLES, true);
+
+ // disable authorization to avoid NPE
+ conf.set(HIVE_AUTHORIZATION_MANAGER.varname,
+ "org.apache.hive.hcatalog.storagehandler.DummyHCatAuthProvider");
+ }
+
+ private void configureFileSystem(TemporaryFolder basedir, HiveConf conf) {
+
+ createAndSetFolderProperty(METASTOREWAREHOUSE, "warehouse", conf, basedir);
+ createAndSetFolderProperty(SCRATCHDIR, "scratchdir", conf, basedir);
+ createAndSetFolderProperty(LOCALSCRATCHDIR, "localscratchdir", conf, basedir);
+ createAndSetFolderProperty(HIVEHISTORYFILELOC, "tmp", conf, basedir);
+
+ conf.setBoolVar(HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS, true);
+
+ createAndSetFolderProperty("hadoop.tmp.dir", "hadooptmp", conf, basedir);
+ createAndSetFolderProperty("test.log.dir", "logs", conf, basedir);
+ }
+
+ private File newFolder(TemporaryFolder basedir, String folder) {
+ try {
+ File newFolder = basedir.newFolder(folder);
+ FileUtil.setPermission(newFolder, FsPermission.getDirDefault());
+ return newFolder;
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to create tmp dir: " + e.getMessage(), e);
+ }
+ }
+
+ public HiveConf getHiveConf() {
+ return hiveConf;
+ }
+
+ @Override
+ public TemporaryFolder getBaseDir() {
+ return basedir;
+ }
+
+ private void createAndSetFolderProperty(
+ HiveConf.ConfVars var, String folder, HiveConf conf,
+ TemporaryFolder basedir) {
+ conf.setVar(var, newFolder(basedir, folder).getAbsolutePath());
+ }
+
+ private void createAndSetFolderProperty(String key, String folder, HiveConf conf, TemporaryFolder basedir) {
+ conf.set(key, newFolder(basedir, folder).getAbsolutePath());
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShim.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShim.java
new file mode 100644
index 0000000..32742d8
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShim.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import com.klarna.hiverunner.builder.HiveShellBuilder;
+import com.klarna.hiverunner.config.HiveRunnerConfig;
+
+/**
+ * Shim layer for hive runner dependency.
+ */
+public interface HiveRunnerShim {
+
+ /**
+ * Sets CommandShellEmulation for HiveShellBuilder.
+ */
+ void setCommandShellEmulation(HiveShellBuilder builder, HiveRunnerConfig config) throws Exception;
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimLoader.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimLoader.java
new file mode 100644
index 0000000..05e6430
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimLoader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Loader to load proper HiveRunnerShim.
+ */
+public class HiveRunnerShimLoader {
+
+ private static final Map<String, HiveRunnerShim> hiveRunnerShims = new ConcurrentHashMap<>();
+
+ private HiveRunnerShimLoader() {
+ }
+
+ public static HiveRunnerShim load() {
+ String hiveVersion = HiveShimLoader.getHiveVersion();
+ return hiveRunnerShims.computeIfAbsent(hiveVersion, v -> {
+ switch (v) {
+ case HiveShimLoader.HIVE_V1_VERSION_NAME:
+ return new HiveRunnerShimV3();
+ case HiveShimLoader.HIVE_V2_VERSION_NAME:
+ return new HiveRunnerShimV4();
+ default:
+ throw new RuntimeException("Unsupported Hive version " + v);
+ }
+ });
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimV3.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimV3.java
new file mode 100644
index 0000000..c405c16
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimV3.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import com.klarna.hiverunner.builder.HiveShellBuilder;
+import com.klarna.hiverunner.config.HiveRunnerConfig;
+
+import java.lang.reflect.Method;
+
+/**
+ * Shim for hive runner 3.x.
+ */
+public class HiveRunnerShimV3 implements HiveRunnerShim {
+
+ @Override
+ public void setCommandShellEmulation(HiveShellBuilder builder, HiveRunnerConfig config) throws Exception {
+ Method method = HiveRunnerConfig.class.getDeclaredMethod("getCommandShellEmulation");
+ Object emulation = method.invoke(config);
+ Class emulationClz = Class.forName("com.klarna.hiverunner.CommandShellEmulation");
+ method = HiveShellBuilder.class.getDeclaredMethod("setCommandShellEmulation", emulationClz);
+ method.invoke(builder, emulation);
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimV4.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimV4.java
new file mode 100644
index 0000000..625867a
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveRunnerShimV4.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import com.klarna.hiverunner.builder.HiveShellBuilder;
+import com.klarna.hiverunner.config.HiveRunnerConfig;
+
+import java.lang.reflect.Method;
+
+/**
+ * Shim for hive runner 4.x.
+ */
+public class HiveRunnerShimV4 implements HiveRunnerShim {
+
+ @Override
+ public void setCommandShellEmulation(HiveShellBuilder builder, HiveRunnerConfig config) throws Exception {
+ Method method = config.getClass().getDeclaredMethod("getCommandShellEmulator");
+ Object emulator = method.invoke(config);
+ Class emulatorClz = Class.forName("com.klarna.hiverunner.sql.cli.CommandShellEmulator");
+ method = builder.getClass().getDeclaredMethod("setCommandShellEmulation", emulatorClz);
+ method.invoke(builder, emulator);
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index e6505df..44124d9 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -23,51 +23,48 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.HiveCatalogConfig;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.JobConf;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
/**
* Tests {@link HiveTableSink}.
*/
+@RunWith(FlinkStandaloneHiveRunner.class)
public class HiveTableSinkTest {
+ @HiveSQL(files = {})
+ private static HiveShell hiveShell;
+
private static HiveCatalog hiveCatalog;
private static HiveConf hiveConf;
@BeforeClass
public static void createCatalog() throws IOException {
- hiveConf = HiveTestUtils.createHiveConf();
+ hiveConf = hiveShell.getHiveConf();
hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
hiveCatalog.open();
}
@@ -91,13 +88,17 @@ public class HiveTableSinkTest {
List<Row> toWrite = generateRecords(5);
tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
- Table hiveTable = hiveCatalog.getHiveTable(tablePath);
CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath);
tableEnv.registerTableSink("destSink", new HiveTableSink(new JobConf(hiveConf), tablePath, table));
tableEnv.sqlQuery("select * from src").insertInto("destSink");
execEnv.execute();
- verifyWrittenData(new Path(hiveTable.getSd().getLocation(), "0"), toWrite, 0);
+ List<String> result = hiveShell.executeQuery("select * from " + tblName);
+ assertEquals(toWrite.size(), result.size());
+ for (int i = 0; i < result.size(); i++) {
+ assertEquals(toWrite.get(i).toString().replaceAll(",", "\t"), result.get(i));
+ }
+
hiveCatalog.dropTable(tablePath, false);
}
@@ -113,7 +114,6 @@ public class HiveTableSinkTest {
List<Row> toWrite = generateRecords(5);
tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
- Table hiveTable = hiveCatalog.getHiveTable(tablePath);
CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath);
tableEnv.registerTableSink("destSink", new HiveTableSink(new JobConf(hiveConf), tablePath, table));
tableEnv.sqlQuery("select * from src").insertInto("destSink");
@@ -121,10 +121,10 @@ public class HiveTableSinkTest {
List<CatalogPartitionSpec> partitionSpecs = hiveCatalog.listPartitions(tablePath);
assertEquals(toWrite.size(), partitionSpecs.size());
- for (int i = 0; i < toWrite.size(); i++) {
- CatalogPartition partition = hiveCatalog.getPartition(tablePath, partitionSpecs.get(i));
- String partitionLocation = partition.getProperties().get(HiveCatalogConfig.PARTITION_LOCATION);
- verifyWrittenData(new Path(partitionLocation, "0"), Collections.singletonList(toWrite.get(i)), 1);
+
+ List<String> result = hiveShell.executeQuery("select * from " + tblName);
+ for (int i = 0; i < result.size(); i++) {
+ assertEquals(toWrite.get(i).toString().replaceAll(",", "\t"), result.get(i));
}
hiveCatalog.dropTable(tablePath, false);
@@ -163,13 +163,14 @@ public class HiveTableSinkTest {
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
tableEnv.registerDataSet("complexSrc", execEnv.fromCollection(toWrite, rowTypeInfo));
- Table hiveTable = hiveCatalog.getHiveTable(tablePath);
CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
tableEnv.registerTableSink("complexSink", new HiveTableSink(new JobConf(hiveConf), tablePath, catalogTable));
tableEnv.sqlQuery("select * from complexSrc").insertInto("complexSink");
execEnv.execute();
- verifyWrittenData(new Path(hiveTable.getSd().getLocation(), "0"), Collections.singletonList("1 2 3,1 a 2 b,3 c"));
+ List<String> result = hiveShell.executeQuery("select * from " + tblName);
+ assertEquals(1, result.size());
+ assertEquals("[1,2,3]\t{1:\"a\",2:\"b\"}\t{\"f1\":3,\"f2\":\"c\"}", result.get(0));
hiveCatalog.dropTable(tablePath, false);
// nested complex types
@@ -191,13 +192,14 @@ public class HiveTableSinkTest {
toWrite.add(row);
tableEnv.registerDataSet("nestedSrc", execEnv.fromCollection(toWrite, rowTypeInfo));
- hiveTable = hiveCatalog.getHiveTable(tablePath);
catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
tableEnv.registerTableSink("nestedSink", new HiveTableSink(new JobConf(hiveConf), tablePath, catalogTable));
tableEnv.sqlQuery("select * from nestedSrc").insertInto("nestedSink");
execEnv.execute();
- verifyWrittenData(new Path(hiveTable.getSd().getLocation(), "0"), Collections.singletonList("1 a 2 b 3 c"));
+ result = hiveShell.executeQuery("select * from " + tblName);
+ assertEquals(1, result.size());
+ assertEquals("[{\"f1\":1,\"f2\":\"a\"},{\"f1\":2,\"f2\":\"b\"},{\"f1\":3,\"f2\":\"c\"}]", result.get(0));
hiveCatalog.dropTable(tablePath, false);
}
@@ -240,27 +242,4 @@ public class HiveTableSinkTest {
}
return res;
}
-
- private void verifyWrittenData(Path outputFile, List<Row> expectedRows, int numPartCols) throws Exception {
- int[] fields = IntStream.range(0, expectedRows.get(0).getArity() - numPartCols).toArray();
- List<String> expected = new ArrayList<>(expectedRows.size());
- for (Row row : expectedRows) {
- expected.add(Row.project(row, fields).toString());
- }
- verifyWrittenData(outputFile, expected);
- }
-
- private void verifyWrittenData(Path outputFile, List<String> expected) throws Exception {
- FileSystem fs = outputFile.getFileSystem(hiveConf);
- assertTrue(fs.exists(outputFile));
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(outputFile)))) {
- int numWritten = 0;
- String line = reader.readLine();
- while (line != null) {
- assertEquals(expected.get(numWritten++), line.replaceAll("\u0001", ",").replaceAll("\u0002", " ").replaceAll("\u0003", " "));
- line = reader.readLine();
- }
- assertEquals(expected.size(), numWritten);
- }
- }
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index d0b4312..82e4d8e 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -27,6 +27,10 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.concurrent.ThreadLocalRandom;
/**
* Test utils for Hive connector.
@@ -36,6 +40,10 @@ public class HiveTestUtils {
private static final String HIVE_WAREHOUSE_URI_FORMAT = "jdbc:derby:;databaseName=%s;create=true";
private static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+ // range of ephemeral ports
+ private static final int MIN_EPH_PORT = 49152;
+ private static final int MAX_EPH_PORT = 61000;
+
/**
* Create a HiveCatalog with an embedded Hive Metastore.
*/
@@ -70,4 +78,21 @@ public class HiveTestUtils {
"Failed to create test HiveConf to HiveCatalog.", e);
}
}
+
+ // Gets a free port of localhost. Note that this method suffers the "time of check to time of use" race condition.
+ // Use it as best efforts to avoid port conflicts.
+ public static int getFreePort() throws IOException {
+ final int numPorts = MAX_EPH_PORT - MIN_EPH_PORT + 1;
+ int numAttempt = 0;
+ while (numAttempt++ < numPorts) {
+ int p = ThreadLocalRandom.current().nextInt(numPorts) + MIN_EPH_PORT;
+ try (ServerSocket socket = new ServerSocket()) {
+ socket.bind(new InetSocketAddress("localhost", p));
+ return socket.getLocalPort();
+ } catch (BindException e) {
+ // this port is in use, try another one
+ }
+ }
+ throw new RuntimeException("Exhausted all ephemeral ports and didn't find a free one");
+ }
}