You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kg...@apache.org on 2019/07/15 10:41:15 UTC
[hive] 04/05: HIVE-21965: Implement parallel processing in
HiveStrictManagedMigration (Krisztian Kasa via Zoltan Haindrich)
This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
commit cff3c626e201f1ab6efb2fd58a28fa80ccd83104
Author: Krisztian Kasa <kk...@cloudera.com>
AuthorDate: Mon Jul 15 11:32:50 2019 +0200
HIVE-21965: Implement parallel processing in HiveStrictManagedMigration (Krisztian Kasa via Zoltan Haindrich)
Signed-off-by: Zoltan Haindrich <ki...@rxd.hu>
---
.../hadoop/hive/ql/util/CloseableThreadLocal.java | 59 ++
.../hive/ql/util/HiveStrictManagedMigration.java | 629 ++++++++++++++-------
.../ql/util/NamedForkJoinWorkerThreadFactory.java | 40 ++
.../hadoop/hive/ql/TxnCommandsBaseForTests.java | 16 +-
.../hive/ql/util/CloseableThreadLocalTest.java | 86 +++
.../ql/util/TestHiveStrictManagedMigration.java | 78 +++
6 files changed, 683 insertions(+), 225 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/CloseableThreadLocal.java b/ql/src/java/org/apache/hadoop/hive/ql/util/CloseableThreadLocal.java
new file mode 100644
index 0000000..efffd38
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/util/CloseableThreadLocal.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class has similar functionality as {@link ThreadLocal}.
+ * Plus it provides a close function to clean up resources in all threads where the resource was initialized.
+ * @param <T> - type of resource, it must implement {@link AutoCloseable}
+ */
+public class CloseableThreadLocal<T extends AutoCloseable> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CloseableThreadLocal.class);
+
+ private final ConcurrentHashMap<Thread, T> threadLocalMap;
+ private final Supplier<T> initialValue;
+
+ public CloseableThreadLocal(Supplier<T> initialValue, int poolSize) {
+ this.initialValue = initialValue;
+ threadLocalMap = new ConcurrentHashMap<>(poolSize);
+ }
+
+ public T get() {
+ return threadLocalMap.computeIfAbsent(Thread.currentThread(), thread -> initialValue.get());
+ }
+
+ public void close() {
+ threadLocalMap.values().forEach(this::closeQuietly);
+ }
+
+ private void closeQuietly(AutoCloseable autoCloseable) {
+ try {
+ autoCloseable.close();
+ } catch (Exception e) {
+ LOG.warn("Error while closing resource.", e);
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java b/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java
index 80025b7..42c4158 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java
@@ -19,10 +19,15 @@
package org.apache.hadoop.hive.ql.util;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
@@ -39,21 +44,17 @@ import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.cli.CommonCliOptions;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
-import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
-import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.HiveStrictManagedUtils;
@@ -61,8 +62,6 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.parse.HiveParser.switchDatabaseStatement_return;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -71,6 +70,7 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
public class HiveStrictManagedMigration {
@@ -85,24 +85,28 @@ public class HiveStrictManagedMigration {
MANAGED // Migrate tables as managed transactional tables
}
- static class RunOptions {
- String dbRegex;
- String tableRegex;
- String oldWarehouseRoot;
- TableMigrationOption migrationOption;
- boolean shouldModifyManagedTableLocation;
- boolean shouldModifyManagedTableOwner;
- boolean shouldModifyManagedTablePermissions;
- boolean dryRun;
-
- public RunOptions(String dbRegex,
- String tableRegex,
- String oldWarehouseRoot,
- TableMigrationOption migrationOption,
- boolean shouldModifyManagedTableLocation,
- boolean shouldModifyManagedTableOwner,
- boolean shouldModifyManagedTablePermissions,
- boolean dryRun) {
+ private static class RunOptions {
+ final String dbRegex;
+ final String tableRegex;
+ final String oldWarehouseRoot;
+ final TableMigrationOption migrationOption;
+ final boolean shouldModifyManagedTableLocation;
+ final boolean shouldModifyManagedTableOwner;
+ final boolean shouldModifyManagedTablePermissions;
+ final boolean dryRun;
+ final TableType tableType;
+ final int tablePoolSize;
+
+ RunOptions(String dbRegex,
+ String tableRegex,
+ String oldWarehouseRoot,
+ TableMigrationOption migrationOption,
+ boolean shouldModifyManagedTableLocation,
+ boolean shouldModifyManagedTableOwner,
+ boolean shouldModifyManagedTablePermissions,
+ boolean dryRun,
+ TableType tableType,
+ int tablePoolSize) {
super();
this.dbRegex = dbRegex;
this.tableRegex = tableRegex;
@@ -112,6 +116,67 @@ public class HiveStrictManagedMigration {
this.shouldModifyManagedTableOwner = shouldModifyManagedTableOwner;
this.shouldModifyManagedTablePermissions = shouldModifyManagedTablePermissions;
this.dryRun = dryRun;
+ this.tableType = tableType;
+ this.tablePoolSize = tablePoolSize;
+ }
+
+ public RunOptions setShouldModifyManagedTableLocation(boolean shouldModifyManagedTableLocation) {
+ return new RunOptions(
+ this.dbRegex,
+ this.tableRegex,
+ this.oldWarehouseRoot,
+ this.migrationOption,
+ shouldModifyManagedTableLocation,
+ this.shouldModifyManagedTableOwner,
+ this.shouldModifyManagedTablePermissions,
+ this.dryRun,
+ this.tableType,
+ this.tablePoolSize);
+ }
+
+ @Override
+ public String toString() {
+ return "RunOptions{" +
+ "dbRegex='" + dbRegex + '\'' +
+ ", tableRegex='" + tableRegex + '\'' +
+ ", oldWarehouseRoot='" + oldWarehouseRoot + '\'' +
+ ", migrationOption=" + migrationOption +
+ ", shouldModifyManagedTableLocation=" + shouldModifyManagedTableLocation +
+ ", shouldModifyManagedTableOwner=" + shouldModifyManagedTableOwner +
+ ", shouldModifyManagedTablePermissions=" + shouldModifyManagedTablePermissions +
+ ", dryRun=" + dryRun +
+ ", tableType=" + tableType +
+ ", tablePoolSize=" + tablePoolSize +
+ '}';
+ }
+ }
+
+ private static class OwnerPermsOptions {
+ final String ownerName;
+ final String groupName;
+ final FsPermission dirPerms;
+ final FsPermission filePerms;
+
+ OwnerPermsOptions(String ownerName, String groupName, FsPermission dirPerms, FsPermission filePerms) {
+ this.ownerName = ownerName;
+ this.groupName = groupName;
+ this.dirPerms = dirPerms;
+ this.filePerms = filePerms;
+ }
+ }
+
+ private static class WarehouseRootCheckResult {
+ final boolean shouldModifyManagedTableLocation;
+ final Path curWhRootPath;
+ final HadoopShims.HdfsEncryptionShim encryptionShim;
+
+ WarehouseRootCheckResult(
+ boolean shouldModifyManagedTableLocation,
+ Path curWhRootPath,
+ HadoopShims.HdfsEncryptionShim encryptionShim) {
+ this.shouldModifyManagedTableLocation = shouldModifyManagedTableLocation;
+ this.curWhRootPath = curWhRootPath;
+ this.encryptionShim = encryptionShim;
}
}
@@ -136,7 +201,15 @@ public class HiveStrictManagedMigration {
int rc = 0;
HiveStrictManagedMigration migration = null;
try {
- migration = new HiveStrictManagedMigration(runOptions);
+ HiveConf conf = hiveConf == null ? new HiveConf() : hiveConf;
+ WarehouseRootCheckResult warehouseRootCheckResult = checkOldWarehouseRoot(runOptions, conf);
+ runOptions = runOptions.setShouldModifyManagedTableLocation(
+ warehouseRootCheckResult.shouldModifyManagedTableLocation);
+ boolean createExternalDirsForDbs = checkExternalWarehouseDir(conf);
+ OwnerPermsOptions ownerPermsOptions = checkOwnerPermsOptions(runOptions, conf);
+
+ migration = new HiveStrictManagedMigration(
+ conf, runOptions, createExternalDirsForDbs, ownerPermsOptions, warehouseRootCheckResult);
migration.run();
} catch (Exception err) {
LOG.error("Failed with error", err);
@@ -148,7 +221,9 @@ public class HiveStrictManagedMigration {
}
// TODO: Something is preventing the process from terminating after main(), adding exit() as hacky solution.
- System.exit(rc);
+ if (hiveConf == null) {
+ System.exit(rc);
+ }
}
static Options createOptions() {
@@ -156,66 +231,84 @@ public class HiveStrictManagedMigration {
// -hiveconf x=y
result.addOption(OptionBuilder
- .withValueSeparator()
- .hasArgs(2)
- .withArgName("property=value")
- .withLongOpt("hiveconf")
- .withDescription("Use value for given property")
- .create());
+ .withValueSeparator()
+ .hasArgs(2)
+ .withArgName("property=value")
+ .withLongOpt("hiveconf")
+ .withDescription("Use value for given property")
+ .create());
result.addOption(OptionBuilder
- .withLongOpt("dryRun")
- .withDescription("Show what migration actions would be taken without actually running commands")
- .create());
+ .withLongOpt("dryRun")
+ .withDescription("Show what migration actions would be taken without actually running commands")
+ .create());
result.addOption(OptionBuilder
- .withLongOpt("dbRegex")
- .withDescription("Regular expression to match database names on which this tool will be run")
- .hasArg()
- .create('d'));
+ .withLongOpt("dbRegex")
+ .withDescription("Regular expression to match database names on which this tool will be run")
+ .hasArg()
+ .create('d'));
result.addOption(OptionBuilder
- .withLongOpt("tableRegex")
- .withDescription("Regular expression to match table names on which this tool will be run")
- .hasArg()
- .create('t'));
+ .withLongOpt("tableRegex")
+ .withDescription("Regular expression to match table names on which this tool will be run")
+ .hasArg()
+ .create('t'));
result.addOption(OptionBuilder
- .withLongOpt("oldWarehouseRoot")
- .withDescription("Location of the previous warehouse root")
- .hasArg()
- .create());
+ .withLongOpt("oldWarehouseRoot")
+ .withDescription("Location of the previous warehouse root")
+ .hasArg()
+ .create());
result.addOption(OptionBuilder
- .withLongOpt("migrationOption")
- .withDescription("Table migration option (automatic|external|managed|validate|none)")
- .hasArg()
- .create('m'));
+ .withLongOpt("migrationOption")
+ .withDescription("Table migration option (automatic|external|managed|validate|none)")
+ .hasArg()
+ .create('m'));
result.addOption(OptionBuilder
- .withLongOpt("shouldModifyManagedTableLocation")
- .withDescription("Whether managed tables should have their data moved from the old warehouse path to the current warehouse path")
- .create());
+ .withLongOpt("shouldModifyManagedTableLocation")
+ .withDescription("Whether managed tables should have their data moved from " +
+ "the old warehouse path to the current warehouse path")
+ .create());
result.addOption(OptionBuilder
- .withLongOpt("shouldModifyManagedTableOwner")
- .withDescription("Whether managed tables should have their directory owners changed to the hive user")
- .create());
+ .withLongOpt("shouldModifyManagedTableOwner")
+ .withDescription("Whether managed tables should have their directory owners changed to the hive user")
+ .create());
result.addOption(OptionBuilder
- .withLongOpt("shouldModifyManagedTablePermissions")
- .withDescription("Whether managed tables should have their directory permissions changed to conform to strict managed tables mode")
- .create());
+ .withLongOpt("shouldModifyManagedTablePermissions")
+ .withDescription("Whether managed tables should have their directory permissions changed to conform to " +
+ "strict managed tables mode")
+ .create());
result.addOption(OptionBuilder
- .withLongOpt("modifyManagedTables")
- .withDescription("This setting enables the shouldModifyManagedTableLocation, shouldModifyManagedTableOwner, shouldModifyManagedTablePermissions options")
- .create());
+ .withLongOpt("modifyManagedTables")
+ .withDescription("This setting enables the shouldModifyManagedTableLocation, " +
+ "shouldModifyManagedTableOwner, shouldModifyManagedTablePermissions options")
+ .create());
result.addOption(OptionBuilder
- .withLongOpt("help")
- .withDescription("print help message")
- .create('h'));
+ .withLongOpt("help")
+ .withDescription("print help message")
+ .create('h'));
+
+ result.addOption(OptionBuilder
+ .withLongOpt("tablePoolSize")
+ .withDescription("Number of threads to process tables.")
+ .hasArg()
+ .create("tn"));
+
+ result.addOption(OptionBuilder
+ .withLongOpt("tableType")
+ .withDescription(String.format("Table type to match tables on which this tool will be run. " +
+ "Possible values: %s Default: all tables",
+ Arrays.stream(TableType.values()).map(Enum::name).collect(Collectors.joining("|"))))
+ .hasArg()
+ .withArgName("table type")
+ .create("tt"));
return result;
}
@@ -251,6 +344,22 @@ public class HiveStrictManagedMigration {
String oldWarehouseRoot = cli.getOptionValue("oldWarehouseRoot");
boolean dryRun = cli.hasOption("dryRun");
+ String tableTypeText = cli.getOptionValue("tableType");
+
+ int defaultPoolSize = Runtime.getRuntime().availableProcessors() / 2;
+ if (defaultPoolSize < 1) {
+ defaultPoolSize = 1;
+ }
+
+ int databasePoolSize = getIntOptionValue(cli, "databasePoolSize", defaultPoolSize);
+ if (databasePoolSize < 1) {
+ throw new IllegalArgumentException("Please specify a positive integer option value for databasePoolSize");
+ }
+ int tablePoolSize = getIntOptionValue(cli, "tablePoolSize", defaultPoolSize);
+ if (tablePoolSize < 1) {
+ throw new IllegalArgumentException("Please specify a positive integer option value for tablePoolSize");
+ }
+
RunOptions runOpts = new RunOptions(
dbRegex,
tableRegex,
@@ -259,75 +368,137 @@ public class HiveStrictManagedMigration {
shouldModifyManagedTableLocation,
shouldModifyManagedTableOwner,
shouldModifyManagedTablePermissions,
- dryRun);
+ dryRun,
+ tableTypeText == null ? null : TableType.valueOf(tableTypeText),
+ tablePoolSize);
return runOpts;
}
- private RunOptions runOptions;
- private HiveConf conf;
- private HiveMetaStoreClient hms;
- private boolean failedValidationChecks;
- private boolean failuresEncountered;
- private Warehouse wh;
- private Warehouse oldWh;
- private String ownerName;
- private String groupName;
- private FsPermission dirPerms;
- private FsPermission filePerms;
- private boolean createExternalDirsForDbs;
- Path curWhRootPath;
- private HadoopShims.HdfsEncryptionShim encryptionShim;
-
- HiveStrictManagedMigration(RunOptions runOptions) {
- this.runOptions = runOptions;
- this.conf = new HiveConf();
+ private static int getIntOptionValue(CommandLine commandLine, String optionName, int defaultValue) {
+ if (commandLine.hasOption(optionName)) {
+ try {
+ return Integer.parseInt(commandLine.getOptionValue(optionName));
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Please specify a positive integer option value for " + optionName, e);
+ }
+ }
+ return defaultValue;
}
- void run() throws Exception {
- wh = new Warehouse(conf);
- checkOldWarehouseRoot();
- checkExternalWarehouseDir();
- checkOwnerPermsOptions();
-
- hms = new HiveMetaStoreClient(conf);//MetaException
- try {
- List<String> databases = hms.getAllDatabases();//TException
- LOG.info("Found {} databases", databases.size());
- for (String dbName : databases) {
- if (dbName.matches(runOptions.dbRegex)) {
- try {
- processDatabase(dbName);
- } catch (Exception err) {
- LOG.error("Error processing database " + dbName, err);
- failuresEncountered = true;
- }
+ private final HiveConf conf;
+ private RunOptions runOptions;
+ private final boolean createExternalDirsForDbs;
+ private final Path curWhRootPath;
+ private final HadoopShims.HdfsEncryptionShim encryptionShim;
+ private final String ownerName;
+ private final String groupName;
+ private final FsPermission dirPerms;
+ private final FsPermission filePerms;
+
+ private CloseableThreadLocal<HiveMetaStoreClient> hms;
+ private ThreadLocal<Warehouse> wh;
+ private ThreadLocal<Warehouse> oldWh;
+ private CloseableThreadLocal<HiveUpdater> hiveUpdater;
+
+ private AtomicBoolean failuresEncountered;
+ private AtomicBoolean failedValidationChecks;
+
+ HiveStrictManagedMigration(HiveConf conf, RunOptions runOptions, boolean createExternalDirsForDbs,
+ OwnerPermsOptions ownerPermsOptions, WarehouseRootCheckResult warehouseRootCheckResult) {
+ this.conf = conf;
+ this.runOptions = runOptions;
+ this.createExternalDirsForDbs = createExternalDirsForDbs;
+ this.ownerName = ownerPermsOptions.ownerName;
+ this.groupName = ownerPermsOptions.groupName;
+ this.dirPerms = ownerPermsOptions.dirPerms;
+ this.filePerms = ownerPermsOptions.filePerms;
+ this.curWhRootPath = warehouseRootCheckResult.curWhRootPath;
+ this.encryptionShim = warehouseRootCheckResult.encryptionShim;
+
+ this.hms = new CloseableThreadLocal<>(() -> {
+ try {
+ HiveMetaStoreClient hiveMetaStoreClient = new HiveMetaStoreClient(conf);
+ if (hiveConf != null) {
+ SessionState ss = SessionState.start(conf);
+ ss.applyAuthorizationPolicy();
}
+ return hiveMetaStoreClient;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- LOG.info("Done processing databases.");
- } finally {
- hms.close();
+ }, runOptions.tablePoolSize);
+ wh = ThreadLocal.withInitial(() -> {
+ try {
+ return new Warehouse(conf);
+ } catch (MetaException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ if (runOptions.shouldModifyManagedTableLocation) {
+ Configuration oldConf = new Configuration(conf);
+ HiveConf.setVar(oldConf, HiveConf.ConfVars.METASTOREWAREHOUSE, runOptions.oldWarehouseRoot);
+
+ oldWh = ThreadLocal.withInitial(() -> {
+ try {
+ return new Warehouse(oldConf);
+ } catch (MetaException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
+ this.hiveUpdater = new CloseableThreadLocal<>(() -> {
+ try {
+ return new HiveUpdater(conf, true);
+ } catch (HiveException e) {
+ throw new RuntimeException(e);
+ }
+ }, runOptions.tablePoolSize);
- if (failuresEncountered) {
+ this.failuresEncountered = new AtomicBoolean(false);
+ this.failedValidationChecks = new AtomicBoolean(false);
+ }
+
+ void run() throws Exception {
+ LOG.info("Starting with {}", runOptions);
+
+ List<String> databases = hms.get().getDatabases(runOptions.dbRegex); //TException
+ LOG.info("Found {} databases", databases.size());
+ ForkJoinPool tablePool = new ForkJoinPool(
+ runOptions.tablePoolSize,
+ new NamedForkJoinWorkerThreadFactory("Table-"),
+ getUncaughtExceptionHandler(),
+ false);
+ databases.forEach(dbName -> processDatabase(dbName, tablePool));
+ LOG.info("Done processing databases.");
+
+ if (failuresEncountered.get()) {
throw new HiveException("One or more failures encountered during processing.");
}
- if (failedValidationChecks) {
+ if (failedValidationChecks.get()) {
throw new HiveException("One or more tables failed validation checks for strict managed table mode.");
}
}
- void checkOldWarehouseRoot() throws IOException, MetaException {
+ private Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
+ return (t, e) -> LOG.error(String.format("Thread %s exited with error", t.getName()), e);
+ }
+
+ static WarehouseRootCheckResult checkOldWarehouseRoot(RunOptions runOptions, HiveConf conf) throws IOException {
+ boolean shouldModifyManagedTableLocation = runOptions.shouldModifyManagedTableLocation;
+ Path curWhRootPath = null;
+ HadoopShims.HdfsEncryptionShim encryptionShim = null;
+
if (runOptions.shouldModifyManagedTableLocation) {
if (runOptions.oldWarehouseRoot == null) {
LOG.info("oldWarehouseRoot is not specified. Disabling shouldModifyManagedTableLocation");
- runOptions.shouldModifyManagedTableLocation = false;
+ shouldModifyManagedTableLocation = false;
} else {
String curWarehouseRoot = HiveConf.getVar(conf, HiveConf.ConfVars.METASTOREWAREHOUSE);
if (arePathsEqual(conf, runOptions.oldWarehouseRoot, curWarehouseRoot)) {
LOG.info("oldWarehouseRoot is the same as the current warehouse root {}."
+ " Disabling shouldModifyManagedTableLocation",
runOptions.oldWarehouseRoot);
- runOptions.shouldModifyManagedTableLocation = false;
+ shouldModifyManagedTableLocation = false;
} else {
Path oldWhRootPath = new Path(runOptions.oldWarehouseRoot);
curWhRootPath = new Path(curWarehouseRoot);
@@ -339,18 +510,18 @@ public class HiveStrictManagedMigration {
LOG.info("oldWarehouseRoot {} has a different FS than the current warehouse root {}."
+ " Disabling shouldModifyManagedTableLocation",
runOptions.oldWarehouseRoot, curWarehouseRoot);
- runOptions.shouldModifyManagedTableLocation = false;
+ shouldModifyManagedTableLocation = false;
} else {
if (!isHdfs(oldWhRootFs)) {
LOG.info("Warehouse is using non-HDFS FileSystem {}. Disabling shouldModifyManagedTableLocation",
oldWhRootFs.getUri());
- runOptions.shouldModifyManagedTableLocation = false;
+ shouldModifyManagedTableLocation = false;
} else {
encryptionShim = ShimLoader.getHadoopShims().createHdfsEncryptionShim(oldWhRootFs, conf);
if (!hasEquivalentEncryption(encryptionShim, oldWhRootPath, curWhRootPath)) {
LOG.info("oldWarehouseRoot {} and current warehouse root {} have different encryption zones." +
" Disabling shouldModifyManagedTableLocation", oldWhRootPath, curWhRootPath);
- runOptions.shouldModifyManagedTableLocation = false;
+ shouldModifyManagedTableLocation = false;
}
}
}
@@ -358,14 +529,15 @@ public class HiveStrictManagedMigration {
}
}
- if (runOptions.shouldModifyManagedTableLocation) {
- Configuration oldWhConf = new Configuration(conf);
- HiveConf.setVar(oldWhConf, HiveConf.ConfVars.METASTOREWAREHOUSE, runOptions.oldWarehouseRoot);
- oldWh = new Warehouse(oldWhConf);
- }
+ return new WarehouseRootCheckResult(shouldModifyManagedTableLocation, curWhRootPath, encryptionShim);
}
- void checkOwnerPermsOptions() {
+ static OwnerPermsOptions checkOwnerPermsOptions(RunOptions runOptions, HiveConf conf) {
+ String ownerName = null;
+ String groupName = null;
+ FsPermission dirPerms = null;
+ FsPermission filePerms = null;
+
if (runOptions.shouldModifyManagedTableOwner) {
ownerName = conf.get("strict.managed.tables.migration.owner", "hive");
groupName = conf.get("strict.managed.tables.migration.group", null);
@@ -380,61 +552,72 @@ public class HiveStrictManagedMigration {
filePerms = new FsPermission(filePermsString);
}
}
+
+ return new OwnerPermsOptions(ownerName, groupName, dirPerms, filePerms);
}
- void checkExternalWarehouseDir() {
+ static boolean checkExternalWarehouseDir(HiveConf conf) {
String externalWarehouseDir = conf.getVar(HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL);
- if (externalWarehouseDir != null && !externalWarehouseDir.isEmpty()) {
- createExternalDirsForDbs = true;
- }
+ return externalWarehouseDir != null && !externalWarehouseDir.isEmpty();
}
- void processDatabase(String dbName) throws IOException, HiveException, MetaException, TException {
- LOG.info("Processing database {}", dbName);
- Database dbObj = hms.getDatabase(dbName);
+ void processDatabase(String dbName, ForkJoinPool tablePool) {
+ try {
+ LOG.info("Processing database {}", dbName);
+ Database dbObj = hms.get().getDatabase(dbName);
- boolean modifyDefaultManagedLocation = shouldModifyDatabaseLocation(dbObj);
- if (modifyDefaultManagedLocation) {
- Path newDefaultDbLocation = wh.getDefaultDatabasePath(dbName);
+ boolean modifyDefaultManagedLocation = shouldModifyDatabaseLocation(dbObj);
+ if (modifyDefaultManagedLocation) {
+ Path newDefaultDbLocation = wh.get().getDefaultDatabasePath(dbName);
- LOG.info("Changing location of database {} to {}", dbName, newDefaultDbLocation);
- if (!runOptions.dryRun) {
- FileSystem fs = newDefaultDbLocation.getFileSystem(conf);
- FileUtils.mkdir(fs, newDefaultDbLocation, conf);
- // Set appropriate owner/perms of the DB dir only, no need to recurse
- checkAndSetFileOwnerPermissions(fs, newDefaultDbLocation,
- ownerName, groupName, dirPerms, null, runOptions.dryRun, false);
+ LOG.info("Changing location of database {} to {}", dbName, newDefaultDbLocation);
+ if (!runOptions.dryRun) {
+ FileSystem fs = newDefaultDbLocation.getFileSystem(conf);
+ FileUtils.mkdir(fs, newDefaultDbLocation, conf);
+ // Set appropriate owner/perms of the DB dir only, no need to recurse
+ checkAndSetFileOwnerPermissions(fs, newDefaultDbLocation,
+ ownerName, groupName, dirPerms, null, runOptions.dryRun, false);
+ }
}
- }
- if (createExternalDirsForDbs) {
- createExternalDbDir(dbObj);
- }
+ if (createExternalDirsForDbs) {
+ createExternalDbDir(dbObj);
+ }
- boolean errorsInThisDb = false;
- List<String> tableNames = hms.getTables(dbName, runOptions.tableRegex);
- for (String tableName : tableNames) {
- // If we did not change the DB location, there is no need to move the table directories.
- try {
- processTable(dbObj, tableName, modifyDefaultManagedLocation);
- } catch (Exception err) {
- LOG.error("Error processing table " + getQualifiedName(dbObj.getName(), tableName), err);
- failuresEncountered = true;
- errorsInThisDb = true;
+ List<String> tableNames;
+ if (runOptions.tableType == null) {
+ tableNames = hms.get().getTables(dbName, runOptions.tableRegex);
+ LOG.debug("found {} tables in {}", tableNames.size(), dbName);
+ } else {
+ tableNames = hms.get().getTables(dbName, runOptions.tableRegex, runOptions.tableType);
+ LOG.debug("found {} {}s in {}", tableNames.size(), runOptions.tableType.name(), dbName);
}
- }
- // Finally update the DB location. This would prevent subsequent runs of the migration from processing this DB.
- if (modifyDefaultManagedLocation) {
+ boolean errorsInThisDb = !tablePool.submit(() -> tableNames.parallelStream()
+ .map(tableName -> processTable(dbObj, tableName, modifyDefaultManagedLocation))
+ .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2)).get();
if (errorsInThisDb) {
- LOG.error("Not updating database location for {} since an error was encountered. The migration must be run again for this database.",
- dbObj.getName());
- } else {
- Path newDefaultDbLocation = wh.getDefaultDatabasePath(dbName);
- // dbObj after this call would have the new DB location.
- // Keep that in mind if anything below this requires the old DB path.
- getHiveUpdater().updateDbLocation(dbObj, newDefaultDbLocation);
+ failuresEncountered.set(true);
+ }
+
+ // Finally update the DB location. This would prevent subsequent runs of the migration from processing this DB.
+ if (modifyDefaultManagedLocation) {
+ if (errorsInThisDb) {
+ LOG.error("Not updating database location for {} since an error was encountered. " +
+ "The migration must be run again for this database.", dbObj.getName());
+ } else {
+ Path newDefaultDbLocation = wh.get().getDefaultDatabasePath(dbName);
+ // dbObj after this call would have the new DB location.
+ // Keep that in mind if anything below this requires the old DB path.
+ hiveUpdater.get().updateDbLocation(dbObj, newDefaultDbLocation);
+ }
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.info("Cancel processing " + dbName, e);
+ } catch (TException | IOException | HiveException | ExecutionException ex) {
+ LOG.error("Error processing database " + dbName, ex);
+ failuresEncountered.set(true);
}
}
@@ -464,44 +647,55 @@ public class HiveStrictManagedMigration {
return false;
}
- void processTable(Database dbObj, String tableName, boolean modifyDefaultManagedLocation)
- throws HiveException, IOException, TException {
- String dbName = dbObj.getName();
- LOG.debug("Processing table {}", getQualifiedName(dbName, tableName));
+ boolean processTable(Database dbObj, String tableName, boolean modifyDefaultManagedLocation) {
+ try {
+ String dbName = dbObj.getName();
+ LOG.debug("Processing table {}", getQualifiedName(dbName, tableName));
- Table tableObj = hms.getTable(dbName, tableName);
- TableType tableType = TableType.valueOf(tableObj.getTableType());
+ Table tableObj = hms.get().getTable(dbName, tableName);
+ TableType tableType = TableType.valueOf(tableObj.getTableType());
- TableMigrationOption migrationOption = runOptions.migrationOption;
- if (migrationOption == TableMigrationOption.AUTOMATIC) {
- migrationOption = determineMigrationTypeAutomatically(tableObj, tableType, ownerName, conf, hms, null);
- }
+ TableMigrationOption migrationOption = runOptions.migrationOption;
+ if (migrationOption == TableMigrationOption.AUTOMATIC) {
+ migrationOption = determineMigrationTypeAutomatically(
+ tableObj, tableType, ownerName, conf, hms.get(), null);
+ }
- failedValidationChecks = migrateTable(tableObj, tableType, migrationOption, runOptions.dryRun,
- getHiveUpdater(), hms, conf);
+ boolean failedValidationCheck = migrateTable(tableObj, tableType, migrationOption, runOptions.dryRun,
+ hiveUpdater.get(), hms.get(), conf);
- if (!failedValidationChecks && (TableType.valueOf(tableObj.getTableType()) == TableType.MANAGED_TABLE)) {
- Path tablePath = new Path(tableObj.getSd().getLocation());
- if (modifyDefaultManagedLocation && shouldModifyTableLocation(dbObj, tableObj)) {
- Path newTablePath = wh.getDnsPath(
- new Path(wh.getDefaultDatabasePath(dbName),
- MetaStoreUtils.encodeTableName(tableName.toLowerCase())));
- moveTableData(dbObj, tableObj, newTablePath);
- if (!runOptions.dryRun) {
- // File ownership/permission checks should be done on the new table path.
- tablePath = newTablePath;
- }
+ if (failedValidationCheck) {
+ this.failedValidationChecks.set(true);
+ return true;
}
- if (runOptions.shouldModifyManagedTableOwner || runOptions.shouldModifyManagedTablePermissions) {
- FileSystem fs = tablePath.getFileSystem(conf);
- if (isHdfs(fs)) {
- // TODO: what about partitions not in the default location?
- checkAndSetFileOwnerPermissions(fs, tablePath,
- ownerName, groupName, dirPerms, filePerms, runOptions.dryRun, true);
+ if (TableType.valueOf(tableObj.getTableType()) == TableType.MANAGED_TABLE) {
+ Path tablePath = new Path(tableObj.getSd().getLocation());
+ if (modifyDefaultManagedLocation && shouldModifyTableLocation(dbObj, tableObj)) {
+ Path newTablePath = wh.get().getDnsPath(
+ new Path(wh.get().getDefaultDatabasePath(dbName),
+ MetaStoreUtils.encodeTableName(tableName.toLowerCase())));
+ moveTableData(dbObj, tableObj, newTablePath);
+ if (!runOptions.dryRun) {
+ // File ownership/permission checks should be done on the new table path.
+ tablePath = newTablePath;
+ }
+ }
+
+ if (runOptions.shouldModifyManagedTableOwner || runOptions.shouldModifyManagedTablePermissions) {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ if (isHdfs(fs)) {
+ // TODO: what about partitions not in the default location?
+ checkAndSetFileOwnerPermissions(fs, tablePath,
+ ownerName, groupName, dirPerms, filePerms, runOptions.dryRun, true);
+ }
}
}
+ } catch (Exception ex) {
+ LOG.error("Error processing table " + getQualifiedName(dbObj.getName(), tableName), ex);
+ return false;
}
+ return true;
}
boolean shouldModifyDatabaseLocation(Database dbObj) throws IOException, MetaException {
@@ -510,7 +704,7 @@ public class HiveStrictManagedMigration {
// Check if the database location is in the default location based on the old warehouse root.
// If so then change the database location to the default based on the current warehouse root.
String dbLocation = dbObj.getLocationUri();
- Path oldDefaultDbLocation = oldWh.getDefaultDatabasePath(dbName);
+ Path oldDefaultDbLocation = oldWh.get().getDefaultDatabasePath(dbName);
if (arePathsEqual(conf, dbLocation, oldDefaultDbLocation.toString())) {
if (hasEquivalentEncryption(encryptionShim, oldDefaultDbLocation, curWhRootPath)) {
return true;
@@ -529,7 +723,7 @@ public class HiveStrictManagedMigration {
// If so then change the table location to the default based on the current warehouse root.
// The existing table directory will also be moved to the new default database directory.
String tableLocation = tableObj.getSd().getLocation();
- Path oldDefaultTableLocation = oldWh.getDefaultTablePath(dbObj, tableObj.getTableName());
+ Path oldDefaultTableLocation = oldWh.get().getDefaultTablePath(dbObj, tableObj.getTableName());
if (arePathsEqual(conf, tableLocation, oldDefaultTableLocation.toString())) {
if (hasEquivalentEncryption(encryptionShim, oldDefaultTableLocation, curWhRootPath)) {
return true;
@@ -545,7 +739,7 @@ public class HiveStrictManagedMigration {
throws IOException, MetaException {
String tableName = tableObj.getTableName();
String partLocation = partObj.getSd().getLocation();
- Path oldDefaultPartLocation = oldWh.getDefaultPartitionPath(dbObj, tableObj, partSpec);
+ Path oldDefaultPartLocation = oldWh.get().getDefaultPartitionPath(dbObj, tableObj, partSpec);
if (arePathsEqual(conf, partLocation, oldDefaultPartLocation.toString())) {
if (hasEquivalentEncryption(encryptionShim, oldDefaultPartLocation, curWhRootPath)) {
return true;
@@ -558,7 +752,7 @@ public class HiveStrictManagedMigration {
}
void createExternalDbDir(Database dbObj) throws IOException, MetaException {
- Path externalTableDbPath = wh.getDefaultExternalDatabasePath(dbObj.getName());
+ Path externalTableDbPath = wh.get().getDefaultExternalDatabasePath(dbObj.getName());
FileSystem fs = externalTableDbPath.getFileSystem(conf);
if (!fs.exists(externalTableDbPath)) {
String dbOwner = ownerName;
@@ -621,19 +815,19 @@ public class HiveStrictManagedMigration {
// locations to be in sync.
if (isPartitionedTable(tableObj)) {
- List<String> partNames = hms.listPartitionNames(dbName, tableName, Short.MAX_VALUE);
+ List<String> partNames = hms.get().listPartitionNames(dbName, tableName, Short.MAX_VALUE);
// TODO: Fetch partitions in batches?
// TODO: Threadpool to process partitions?
for (String partName : partNames) {
- Partition partObj = hms.getPartition(dbName, tableName, partName);
+ Partition partObj = hms.get().getPartition(dbName, tableName, partName);
Map<String, String> partSpec =
Warehouse.makeSpecFromValues(tableObj.getPartitionKeys(), partObj.getValues());
if (shouldModifyPartitionLocation(dbObj, tableObj, partObj, partSpec)) {
// Table directory (which includes the partition directory) has already been moved,
// just update the partition location in the metastore.
if (!runOptions.dryRun) {
- Path newPartPath = wh.getPartitionPath(newTablePath, partSpec);
- getHiveUpdater().updatePartitionLocation(dbName, tableObj, partName, partObj, newPartPath);
+ Path newPartPath = wh.get().getPartitionPath(newTablePath, partSpec);
+ hiveUpdater.get().updatePartitionLocation(dbName, tableObj, partName, partObj, newPartPath);
}
}
}
@@ -642,7 +836,7 @@ public class HiveStrictManagedMigration {
// Finally update the table location. This would prevent this tool from processing this table again
// on subsequent runs of the migration.
if (!runOptions.dryRun) {
- getHiveUpdater().updateTableLocation(tableObj, newTablePath);
+ hiveUpdater.get().updateTableLocation(tableObj, newTablePath);
}
}
@@ -907,6 +1101,7 @@ public class HiveStrictManagedMigration {
}
void cleanup() {
+ hms.close();
if (hiveUpdater != null) {
runAndLogErrors(() -> hiveUpdater.close());
hiveUpdater = null;
@@ -917,13 +1112,6 @@ public class HiveStrictManagedMigration {
return new HiveUpdater(conf, false);
}
- HiveUpdater getHiveUpdater() throws HiveException {
- if (hiveUpdater == null) {
- hiveUpdater = new HiveUpdater(conf, true);
- }
- return hiveUpdater;
- }
-
private static final class TxnCtx {
public final long writeId;
public final String validWriteIds;
@@ -936,7 +1124,7 @@ public class HiveStrictManagedMigration {
}
}
- public static class HiveUpdater {
+ private static class HiveUpdater implements AutoCloseable {
Hive hive;
boolean doFileRename;
@@ -946,9 +1134,10 @@ public class HiveStrictManagedMigration {
doFileRename = fileRename;
}
- void close() {
+ @Override
+ public void close() {
if (hive != null) {
- runAndLogErrors(() -> Hive.closeCurrent());
+ runAndLogErrors(Hive::closeCurrent);
hive = null;
}
}
@@ -1122,8 +1311,6 @@ public class HiveStrictManagedMigration {
}
}
- HiveUpdater hiveUpdater;
-
interface ThrowableRunnable {
void run() throws Exception;
}
@@ -1157,7 +1344,7 @@ public class HiveStrictManagedMigration {
}
static boolean isHdfs(FileSystem fs) {
- return fs.getScheme().equals("hdfs");
+ return scheme.equals(fs.getScheme());
}
static String getQualifiedName(Table tableObj) {
@@ -1335,4 +1522,12 @@ public class HiveStrictManagedMigration {
}
return true;
}
+
+ /**
+ * can set it from tests to test when config needs something other than default values.
+ */
+ @VisibleForTesting
+ static HiveConf hiveConf = null;
+ @VisibleForTesting
+ static String scheme = "hdfs";
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java
new file mode 100644
index 0000000..5b6eecc
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.util;
+
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
+
+/**
+ * This class allows specifying a prefix for ForkJoinPool thread names.
+ */
+public class NamedForkJoinWorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
+
+ NamedForkJoinWorkerThreadFactory(String namePrefix) {
+ this.namePrefix = namePrefix;
+ }
+
+ private final String namePrefix;
+
+ @Override
+ public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
+ ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+ worker.setName(namePrefix + worker.getName());
+ return worker;
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 5f39fdc..7039b89 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -17,6 +17,13 @@
*/
package org.apache.hadoop.hive.ql;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -40,13 +47,6 @@ import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
public abstract class TxnCommandsBaseForTests {
private static final Logger LOG = LoggerFactory.getLogger(TxnCommandsBaseForTests.class);
//bucket count for test tables; set it to 1 for easier debugging
@@ -55,7 +55,7 @@ public abstract class TxnCommandsBaseForTests {
public TestName testName = new TestName();
protected HiveConf hiveConf;
Driver d;
- enum Table {
+ public enum Table {
ACIDTBL("acidTbl"),
ACIDTBLPART("acidTblPart"),
ACIDTBL2("acidTbl2"),
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/util/CloseableThreadLocalTest.java b/ql/src/test/org/apache/hadoop/hive/ql/util/CloseableThreadLocalTest.java
new file mode 100644
index 0000000..25a59b7
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/util/CloseableThreadLocalTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.util;
+
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+public class CloseableThreadLocalTest {
+
+ private static class AutoCloseableStub implements AutoCloseable {
+
+ private boolean closed = false;
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public void close() throws Exception {
+ closed = true;
+ }
+ }
+
+ @Test
+ public void testResourcesAreInitiallyNotClosed() {
+ CloseableThreadLocal<AutoCloseableStub> closeableThreadLocal =
+ new CloseableThreadLocal<>(AutoCloseableStub::new, 1);
+
+ assertThat(closeableThreadLocal.get().isClosed(), is(false));
+ }
+
+ @Test
+ public void testAfterCallingCloseAllInstancesAreClosed() throws ExecutionException, InterruptedException {
+ CloseableThreadLocal<AutoCloseableStub> closeableThreadLocal =
+ new CloseableThreadLocal<>(AutoCloseableStub::new, 2);
+
+ AutoCloseableStub asyncInstance = CompletableFuture.supplyAsync(closeableThreadLocal::get).get();
+ AutoCloseableStub syncInstance = closeableThreadLocal.get();
+
+ closeableThreadLocal.close();
+
+ assertThat(asyncInstance.isClosed(), is(true));
+ assertThat(syncInstance.isClosed(), is(true));
+ }
+
+ @Test
+ public void testSubsequentGetsInTheSameThreadGivesBackTheSameObject() {
+ CloseableThreadLocal<AutoCloseableStub> closeableThreadLocal =
+ new CloseableThreadLocal<>(AutoCloseableStub::new, 2);
+
+ AutoCloseableStub ref1 = closeableThreadLocal.get();
+ AutoCloseableStub ref2 = closeableThreadLocal.get();
+ assertThat(ref1, is(ref2));
+ }
+
+ @Test
+ public void testDifferentThreadsHasDifferentInstancesOfTheResource() throws ExecutionException, InterruptedException {
+ CloseableThreadLocal<AutoCloseableStub> closeableThreadLocal =
+ new CloseableThreadLocal<>(AutoCloseableStub::new, 2);
+
+ AutoCloseableStub asyncInstance = CompletableFuture.supplyAsync(closeableThreadLocal::get).get();
+ AutoCloseableStub syncInstance = closeableThreadLocal.get();
+ assertThat(asyncInstance, is(not(syncInstance)));
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/util/TestHiveStrictManagedMigration.java b/ql/src/test/org/apache/hadoop/hive/ql/util/TestHiveStrictManagedMigration.java
new file mode 100644
index 0000000..057135b
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/util/TestHiveStrictManagedMigration.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.util;
+
+import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.ACIDTBL;
+import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.ACIDTBLPART;
+import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.NONACIDNONBUCKET;
+import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.NONACIDORCTBL;
+import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.NONACIDORCTBL2;
+
+import java.io.File;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.TxnCommandsBaseForTests;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHiveStrictManagedMigration extends TxnCommandsBaseForTests {
+ private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+ File.separator + TestHiveStrictManagedMigration.class.getCanonicalName() + "-" + System.currentTimeMillis()
+ ).getPath().replaceAll("\\\\", "/");
+
+ @Test
+ public void testUpgrade() throws Exception {
+ int[][] data = {{1, 2}, {3, 4}, {5, 6}};
+ runStatementOnDriver("DROP TABLE IF EXISTS test.TAcid");
+ runStatementOnDriver("DROP DATABASE IF EXISTS test");
+
+ runStatementOnDriver("CREATE DATABASE test");
+ runStatementOnDriver(
+ "CREATE TABLE test.TAcid (a int, b int) CLUSTERED BY (b) INTO 2 BUCKETS STORED AS orc TBLPROPERTIES" +
+ " ('transactional'='true')");
+ runStatementOnDriver("INSERT INTO test.TAcid" + makeValuesClause(data));
+
+ runStatementOnDriver(
+ "CREATE EXTERNAL TABLE texternal (a int, b int)");
+
+ String oldWarehouse = getWarehouseDir();
+ String[] args = {"--hiveconf", "hive.strict.managed.tables=true", "-m", "automatic", "--modifyManagedTables",
+ "--oldWarehouseRoot", oldWarehouse};
+ HiveConf newConf = new HiveConf(hiveConf);
+ File newWarehouseDir = new File(getTestDataDir(), "newWarehouse");
+ newConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, newWarehouseDir.getAbsolutePath());
+ newConf.set("strict.managed.tables.migration.owner", System.getProperty("user.name"));
+ HiveStrictManagedMigration.hiveConf = newConf;
+ HiveStrictManagedMigration.scheme = "file";
+ HiveStrictManagedMigration.main(args);
+
+ Assert.assertTrue(newWarehouseDir.exists());
+ Assert.assertTrue(new File(newWarehouseDir, ACIDTBL.toString().toLowerCase()).exists());
+ Assert.assertTrue(new File(newWarehouseDir, ACIDTBLPART.toString().toLowerCase()).exists());
+ Assert.assertTrue(new File(newWarehouseDir, NONACIDNONBUCKET.toString().toLowerCase()).exists());
+ Assert.assertTrue(new File(newWarehouseDir, NONACIDORCTBL.toString().toLowerCase()).exists());
+ Assert.assertTrue(new File(newWarehouseDir, NONACIDORCTBL2.toString().toLowerCase()).exists());
+ Assert.assertTrue(new File(new File(newWarehouseDir, "test.db"), "tacid").exists());
+ Assert.assertTrue(new File(oldWarehouse, "texternal").exists());
+ }
+
+ @Override
+ protected String getTestDataDir() {
+ return TEST_DATA_DIR;
+ }
+}