You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kg...@apache.org on 2019/07/15 10:41:15 UTC

[hive] 04/05: HIVE-21965: Implement parallel processing in HiveStrictManagedMigration (Krisztian Kasa via Zoltan Haindrich)

This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git

commit cff3c626e201f1ab6efb2fd58a28fa80ccd83104
Author: Krisztian Kasa <kk...@cloudera.com>
AuthorDate: Mon Jul 15 11:32:50 2019 +0200

    HIVE-21965: Implement parallel processing in HiveStrictManagedMigration (Krisztian Kasa via Zoltan Haindrich)
    
    Signed-off-by: Zoltan Haindrich <ki...@rxd.hu>
---
 .../hadoop/hive/ql/util/CloseableThreadLocal.java  |  59 ++
 .../hive/ql/util/HiveStrictManagedMigration.java   | 629 ++++++++++++++-------
 .../ql/util/NamedForkJoinWorkerThreadFactory.java  |  40 ++
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java    |  16 +-
 .../hive/ql/util/CloseableThreadLocalTest.java     |  86 +++
 .../ql/util/TestHiveStrictManagedMigration.java    |  78 +++
 6 files changed, 683 insertions(+), 225 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/CloseableThreadLocal.java b/ql/src/java/org/apache/hadoop/hive/ql/util/CloseableThreadLocal.java
new file mode 100644
index 0000000..efffd38
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/util/CloseableThreadLocal.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class has similar functionality as {@link ThreadLocal}.
+ * Plus it provides a close function to clean up resources in all threads where the resource was initialized.
+ * @param <T> - type of resource, it must implement {@link AutoCloseable}
+ */
+public class CloseableThreadLocal<T extends AutoCloseable> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CloseableThreadLocal.class);
+
+  private final ConcurrentHashMap<Thread, T> threadLocalMap;
+  private final Supplier<T> initialValue;
+
+  public CloseableThreadLocal(Supplier<T> initialValue, int poolSize) {
+    this.initialValue = initialValue;
+    threadLocalMap = new ConcurrentHashMap<>(poolSize);
+  }
+
+  public T get() {
+    return threadLocalMap.computeIfAbsent(Thread.currentThread(), thread -> initialValue.get());
+  }
+
+  public void close() {
+    threadLocalMap.values().forEach(this::closeQuietly);
+  }
+
+  private void closeQuietly(AutoCloseable autoCloseable) {
+    try {
+      autoCloseable.close();
+    } catch (Exception e) {
+      LOG.warn("Error while closing resource.", e);
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java b/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java
index 80025b7..42c4158 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java
@@ -19,10 +19,15 @@
 package org.apache.hadoop.hive.ql.util;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
@@ -39,21 +44,17 @@ import org.apache.hadoop.hive.common.LogUtils;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.cli.CommonCliOptions;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
 import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
-import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
-import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.HiveStrictManagedUtils;
@@ -61,8 +62,6 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.parse.HiveParser.switchDatabaseStatement_return;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -71,6 +70,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 
 public class HiveStrictManagedMigration {
@@ -85,24 +85,28 @@ public class HiveStrictManagedMigration {
     MANAGED    // Migrate tables as managed transactional tables
   }
 
-  static class RunOptions {
-    String dbRegex;
-    String tableRegex;
-    String oldWarehouseRoot;
-    TableMigrationOption migrationOption;
-    boolean shouldModifyManagedTableLocation;
-    boolean shouldModifyManagedTableOwner;
-    boolean shouldModifyManagedTablePermissions;
-    boolean dryRun;
-
-    public RunOptions(String dbRegex,
-        String tableRegex,
-        String oldWarehouseRoot,
-        TableMigrationOption migrationOption,
-        boolean shouldModifyManagedTableLocation,
-        boolean shouldModifyManagedTableOwner,
-        boolean shouldModifyManagedTablePermissions,
-        boolean dryRun) {
+  private static class RunOptions {
+    final String dbRegex;
+    final String tableRegex;
+    final String oldWarehouseRoot;
+    final TableMigrationOption migrationOption;
+    final boolean shouldModifyManagedTableLocation;
+    final boolean shouldModifyManagedTableOwner;
+    final boolean shouldModifyManagedTablePermissions;
+    final boolean dryRun;
+    final TableType tableType;
+    final int tablePoolSize;
+
+    RunOptions(String dbRegex,
+               String tableRegex,
+               String oldWarehouseRoot,
+               TableMigrationOption migrationOption,
+               boolean shouldModifyManagedTableLocation,
+               boolean shouldModifyManagedTableOwner,
+               boolean shouldModifyManagedTablePermissions,
+               boolean dryRun,
+               TableType tableType,
+               int tablePoolSize) {
       super();
       this.dbRegex = dbRegex;
       this.tableRegex = tableRegex;
@@ -112,6 +116,67 @@ public class HiveStrictManagedMigration {
       this.shouldModifyManagedTableOwner = shouldModifyManagedTableOwner;
       this.shouldModifyManagedTablePermissions = shouldModifyManagedTablePermissions;
       this.dryRun = dryRun;
+      this.tableType = tableType;
+      this.tablePoolSize = tablePoolSize;
+    }
+
+    public RunOptions setShouldModifyManagedTableLocation(boolean shouldModifyManagedTableLocation) {
+      return new RunOptions(
+              this.dbRegex,
+              this.tableRegex,
+              this.oldWarehouseRoot,
+              this.migrationOption,
+              shouldModifyManagedTableLocation,
+              this.shouldModifyManagedTableOwner,
+              this.shouldModifyManagedTablePermissions,
+              this.dryRun,
+              this.tableType,
+              this.tablePoolSize);
+    }
+
+    @Override
+    public String toString() {
+      return "RunOptions{" +
+              "dbRegex='" + dbRegex + '\'' +
+              ", tableRegex='" + tableRegex + '\'' +
+              ", oldWarehouseRoot='" + oldWarehouseRoot + '\'' +
+              ", migrationOption=" + migrationOption +
+              ", shouldModifyManagedTableLocation=" + shouldModifyManagedTableLocation +
+              ", shouldModifyManagedTableOwner=" + shouldModifyManagedTableOwner +
+              ", shouldModifyManagedTablePermissions=" + shouldModifyManagedTablePermissions +
+              ", dryRun=" + dryRun +
+              ", tableType=" + tableType +
+              ", tablePoolSize=" + tablePoolSize +
+              '}';
+    }
+  }
+
+  private static class OwnerPermsOptions {
+    final String ownerName;
+    final String groupName;
+    final FsPermission dirPerms;
+    final FsPermission filePerms;
+
+    OwnerPermsOptions(String ownerName, String groupName, FsPermission dirPerms, FsPermission filePerms) {
+      this.ownerName = ownerName;
+      this.groupName = groupName;
+      this.dirPerms = dirPerms;
+      this.filePerms = filePerms;
+    }
+  }
+
+  private static class WarehouseRootCheckResult {
+    final boolean shouldModifyManagedTableLocation;
+    final Path curWhRootPath;
+    final HadoopShims.HdfsEncryptionShim encryptionShim;
+
+    WarehouseRootCheckResult(
+            boolean shouldModifyManagedTableLocation,
+            Path curWhRootPath,
+            HadoopShims.HdfsEncryptionShim encryptionShim) {
+      this.shouldModifyManagedTableLocation = shouldModifyManagedTableLocation;
+      this.curWhRootPath = curWhRootPath;
+      this.encryptionShim = encryptionShim;
     }
   }
 
@@ -136,7 +201,15 @@ public class HiveStrictManagedMigration {
     int rc = 0;
     HiveStrictManagedMigration migration = null;
     try {
-      migration = new HiveStrictManagedMigration(runOptions);
+      HiveConf conf = hiveConf == null ? new HiveConf() : hiveConf;
+      WarehouseRootCheckResult warehouseRootCheckResult = checkOldWarehouseRoot(runOptions, conf);
+      runOptions = runOptions.setShouldModifyManagedTableLocation(
+              warehouseRootCheckResult.shouldModifyManagedTableLocation);
+      boolean createExternalDirsForDbs = checkExternalWarehouseDir(conf);
+      OwnerPermsOptions ownerPermsOptions = checkOwnerPermsOptions(runOptions, conf);
+
+      migration = new HiveStrictManagedMigration(
+              conf, runOptions, createExternalDirsForDbs, ownerPermsOptions, warehouseRootCheckResult);
       migration.run();
     } catch (Exception err) {
       LOG.error("Failed with error", err);
@@ -148,7 +221,9 @@ public class HiveStrictManagedMigration {
     }
 
     // TODO: Something is preventing the process from terminating after main(), adding exit() as hacky solution.
-    System.exit(rc);
+    if (hiveConf == null) {
+      System.exit(rc);
+    }
   }
 
   static Options createOptions() {
@@ -156,66 +231,84 @@ public class HiveStrictManagedMigration {
 
     // -hiveconf x=y
     result.addOption(OptionBuilder
-        .withValueSeparator()
-        .hasArgs(2)
-        .withArgName("property=value")
-        .withLongOpt("hiveconf")
-        .withDescription("Use value for given property")
-        .create());
+            .withValueSeparator()
+            .hasArgs(2)
+            .withArgName("property=value")
+            .withLongOpt("hiveconf")
+            .withDescription("Use value for given property")
+            .create());
 
     result.addOption(OptionBuilder
-        .withLongOpt("dryRun")
-        .withDescription("Show what migration actions would be taken without actually running commands")
-        .create());
+            .withLongOpt("dryRun")
+            .withDescription("Show what migration actions would be taken without actually running commands")
+            .create());
 
     result.addOption(OptionBuilder
-        .withLongOpt("dbRegex")
-        .withDescription("Regular expression to match database names on which this tool will be run")
-        .hasArg()
-        .create('d'));
+            .withLongOpt("dbRegex")
+            .withDescription("Regular expression to match database names on which this tool will be run")
+            .hasArg()
+            .create('d'));
 
     result.addOption(OptionBuilder
-        .withLongOpt("tableRegex")
-        .withDescription("Regular expression to match table names on which this tool will be run")
-        .hasArg()
-        .create('t'));
+            .withLongOpt("tableRegex")
+            .withDescription("Regular expression to match table names on which this tool will be run")
+            .hasArg()
+            .create('t'));
 
     result.addOption(OptionBuilder
-        .withLongOpt("oldWarehouseRoot")
-        .withDescription("Location of the previous warehouse root")
-        .hasArg()
-        .create());
+            .withLongOpt("oldWarehouseRoot")
+            .withDescription("Location of the previous warehouse root")
+            .hasArg()
+            .create());
 
     result.addOption(OptionBuilder
-        .withLongOpt("migrationOption")
-        .withDescription("Table migration option (automatic|external|managed|validate|none)")
-        .hasArg()
-        .create('m'));
+            .withLongOpt("migrationOption")
+            .withDescription("Table migration option (automatic|external|managed|validate|none)")
+            .hasArg()
+            .create('m'));
 
     result.addOption(OptionBuilder
-        .withLongOpt("shouldModifyManagedTableLocation")
-        .withDescription("Whether managed tables should have their data moved from the old warehouse path to the current warehouse path")
-        .create());
+            .withLongOpt("shouldModifyManagedTableLocation")
+            .withDescription("Whether managed tables should have their data moved from " +
+                    "the old warehouse path to the current warehouse path")
+            .create());
 
     result.addOption(OptionBuilder
-        .withLongOpt("shouldModifyManagedTableOwner")
-        .withDescription("Whether managed tables should have their directory owners changed to the hive user")
-        .create());
+            .withLongOpt("shouldModifyManagedTableOwner")
+            .withDescription("Whether managed tables should have their directory owners changed to the hive user")
+            .create());
 
     result.addOption(OptionBuilder
-        .withLongOpt("shouldModifyManagedTablePermissions")
-        .withDescription("Whether managed tables should have their directory permissions changed to conform to strict managed tables mode")
-        .create());
+            .withLongOpt("shouldModifyManagedTablePermissions")
+            .withDescription("Whether managed tables should have their directory permissions changed to conform to " +
+                    "strict managed tables mode")
+            .create());
 
     result.addOption(OptionBuilder
-        .withLongOpt("modifyManagedTables")
-        .withDescription("This setting enables the shouldModifyManagedTableLocation, shouldModifyManagedTableOwner, shouldModifyManagedTablePermissions options")
-        .create());
+            .withLongOpt("modifyManagedTables")
+            .withDescription("This setting enables the shouldModifyManagedTableLocation, " +
+                    "shouldModifyManagedTableOwner, shouldModifyManagedTablePermissions options")
+            .create());
 
     result.addOption(OptionBuilder
-        .withLongOpt("help")
-        .withDescription("print help message")
-        .create('h'));
+            .withLongOpt("help")
+            .withDescription("print help message")
+            .create('h'));
+
+    result.addOption(OptionBuilder
+            .withLongOpt("tablePoolSize")
+            .withDescription("Number of threads to process tables.")
+            .hasArg()
+            .create("tn"));
+
+    result.addOption(OptionBuilder
+            .withLongOpt("tableType")
+            .withDescription(String.format("Table type to match tables on which this tool will be run. " +
+                            "Possible values: %s Default: all tables",
+                    Arrays.stream(TableType.values()).map(Enum::name).collect(Collectors.joining("|"))))
+            .hasArg()
+            .withArgName("table type")
+            .create("tt"));
 
     return result;
   }
@@ -251,6 +344,22 @@ public class HiveStrictManagedMigration {
     String oldWarehouseRoot = cli.getOptionValue("oldWarehouseRoot");
     boolean dryRun = cli.hasOption("dryRun");
 
+    String tableTypeText = cli.getOptionValue("tableType");
+
+    int defaultPoolSize = Runtime.getRuntime().availableProcessors() / 2;
+    if (defaultPoolSize < 1) {
+      defaultPoolSize = 1;
+    }
+
+    int databasePoolSize = getIntOptionValue(cli, "databasePoolSize", defaultPoolSize);
+    if (databasePoolSize < 1) {
+      throw new IllegalArgumentException("Please specify a positive integer option value for databasePoolSize");
+    }
+    int tablePoolSize = getIntOptionValue(cli, "tablePoolSize", defaultPoolSize);
+    if (tablePoolSize < 1) {
+      throw new IllegalArgumentException("Please specify a positive integer option value for tablePoolSize");
+    }
+
     RunOptions runOpts = new RunOptions(
         dbRegex,
         tableRegex,
@@ -259,75 +368,137 @@ public class HiveStrictManagedMigration {
         shouldModifyManagedTableLocation,
         shouldModifyManagedTableOwner,
         shouldModifyManagedTablePermissions,
-        dryRun);
+        dryRun,
+        tableTypeText == null ? null : TableType.valueOf(tableTypeText),
+        tablePoolSize);
     return runOpts;
   }
 
-  private RunOptions runOptions;
-  private HiveConf conf;
-  private HiveMetaStoreClient hms;
-  private boolean failedValidationChecks;
-  private boolean failuresEncountered;
-  private Warehouse wh;
-  private Warehouse oldWh;
-  private String ownerName;
-  private String groupName;
-  private FsPermission dirPerms;
-  private FsPermission filePerms;
-  private boolean createExternalDirsForDbs;
-  Path curWhRootPath;
-  private HadoopShims.HdfsEncryptionShim encryptionShim;
-
-  HiveStrictManagedMigration(RunOptions runOptions) {
-    this.runOptions = runOptions;
-    this.conf = new HiveConf();
+  private static int getIntOptionValue(CommandLine commandLine, String optionName, int defaultValue) {
+    if (commandLine.hasOption(optionName)) {
+      try {
+        return Integer.parseInt(commandLine.getOptionValue(optionName));
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("Please specify a positive integer option value for " + optionName, e);
+      }
+    }
+    return defaultValue;
   }
 
-  void run() throws Exception {
-    wh = new Warehouse(conf);
-    checkOldWarehouseRoot();
-    checkExternalWarehouseDir();
-    checkOwnerPermsOptions();
-
-    hms = new HiveMetaStoreClient(conf);//MetaException
-    try {
-      List<String> databases = hms.getAllDatabases();//TException
-      LOG.info("Found {} databases", databases.size());
-      for (String dbName : databases) {
-        if (dbName.matches(runOptions.dbRegex)) {
-          try {
-            processDatabase(dbName);
-          } catch (Exception err) {
-            LOG.error("Error processing database " + dbName, err);
-            failuresEncountered = true;
-          }
+  private final HiveConf conf;
+  private RunOptions runOptions;
+  private final boolean createExternalDirsForDbs;
+  private final Path curWhRootPath;
+  private final HadoopShims.HdfsEncryptionShim encryptionShim;
+  private final String ownerName;
+  private final String groupName;
+  private final FsPermission dirPerms;
+  private final FsPermission filePerms;
+
+  private CloseableThreadLocal<HiveMetaStoreClient> hms;
+  private ThreadLocal<Warehouse> wh;
+  private ThreadLocal<Warehouse> oldWh;
+  private CloseableThreadLocal<HiveUpdater> hiveUpdater;
+
+  private AtomicBoolean failuresEncountered;
+  private AtomicBoolean failedValidationChecks;
+
+  HiveStrictManagedMigration(HiveConf conf, RunOptions runOptions, boolean createExternalDirsForDbs,
+                             OwnerPermsOptions ownerPermsOptions, WarehouseRootCheckResult warehouseRootCheckResult) {
+    this.conf = conf;
+    this.runOptions = runOptions;
+    this.createExternalDirsForDbs = createExternalDirsForDbs;
+    this.ownerName = ownerPermsOptions.ownerName;
+    this.groupName = ownerPermsOptions.groupName;
+    this.dirPerms = ownerPermsOptions.dirPerms;
+    this.filePerms = ownerPermsOptions.filePerms;
+    this.curWhRootPath = warehouseRootCheckResult.curWhRootPath;
+    this.encryptionShim = warehouseRootCheckResult.encryptionShim;
+
+    this.hms = new CloseableThreadLocal<>(() -> {
+      try {
+        HiveMetaStoreClient hiveMetaStoreClient = new HiveMetaStoreClient(conf);
+        if (hiveConf != null) {
+          SessionState ss = SessionState.start(conf);
+          ss.applyAuthorizationPolicy();
         }
+        return hiveMetaStoreClient;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
       }
-      LOG.info("Done processing databases.");
-    } finally {
-      hms.close();
+    }, runOptions.tablePoolSize);
+    wh = ThreadLocal.withInitial(() -> {
+      try {
+        return new Warehouse(conf);
+      } catch (MetaException e) {
+        throw new RuntimeException(e);
+      }
+    });
+    if (runOptions.shouldModifyManagedTableLocation) {
+      Configuration oldConf = new Configuration(conf);
+      HiveConf.setVar(oldConf, HiveConf.ConfVars.METASTOREWAREHOUSE, runOptions.oldWarehouseRoot);
+
+      oldWh = ThreadLocal.withInitial(() -> {
+        try {
+          return new Warehouse(oldConf);
+        } catch (MetaException e) {
+          throw new RuntimeException(e);
+        }
+      });
     }
+    this.hiveUpdater = new CloseableThreadLocal<>(() -> {
+      try {
+        return new HiveUpdater(conf, true);
+      } catch (HiveException e) {
+        throw new RuntimeException(e);
+      }
+    }, runOptions.tablePoolSize);
 
-    if (failuresEncountered) {
+    this.failuresEncountered = new AtomicBoolean(false);
+    this.failedValidationChecks = new AtomicBoolean(false);
+  }
+
+  void run() throws Exception {
+    LOG.info("Starting with {}", runOptions);
+
+    List<String> databases = hms.get().getDatabases(runOptions.dbRegex); //TException
+    LOG.info("Found {} databases", databases.size());
+    ForkJoinPool tablePool = new ForkJoinPool(
+            runOptions.tablePoolSize,
+            new NamedForkJoinWorkerThreadFactory("Table-"),
+            getUncaughtExceptionHandler(),
+            false);
+    databases.forEach(dbName -> processDatabase(dbName, tablePool));
+    LOG.info("Done processing databases.");
+
+    if (failuresEncountered.get()) {
       throw new HiveException("One or more failures encountered during processing.");
     }
-    if (failedValidationChecks) {
+    if (failedValidationChecks.get()) {
       throw new HiveException("One or more tables failed validation checks for strict managed table mode.");
     }
   }
 
-  void checkOldWarehouseRoot() throws IOException, MetaException {
+  private Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
+    return (t, e) -> LOG.error(String.format("Thread %s exited with error", t.getName()), e);
+  }
+
+  static WarehouseRootCheckResult checkOldWarehouseRoot(RunOptions runOptions, HiveConf conf) throws IOException {
+    boolean shouldModifyManagedTableLocation = runOptions.shouldModifyManagedTableLocation;
+    Path curWhRootPath = null;
+    HadoopShims.HdfsEncryptionShim encryptionShim = null;
+
     if (runOptions.shouldModifyManagedTableLocation) {
       if (runOptions.oldWarehouseRoot == null) {
         LOG.info("oldWarehouseRoot is not specified. Disabling shouldModifyManagedTableLocation");
-        runOptions.shouldModifyManagedTableLocation = false;
+        shouldModifyManagedTableLocation = false;
       } else {
         String curWarehouseRoot = HiveConf.getVar(conf, HiveConf.ConfVars.METASTOREWAREHOUSE);
         if (arePathsEqual(conf, runOptions.oldWarehouseRoot, curWarehouseRoot)) {
           LOG.info("oldWarehouseRoot is the same as the current warehouse root {}."
               + " Disabling shouldModifyManagedTableLocation",
               runOptions.oldWarehouseRoot);
-          runOptions.shouldModifyManagedTableLocation = false;
+          shouldModifyManagedTableLocation = false;
         } else {
           Path oldWhRootPath = new Path(runOptions.oldWarehouseRoot);
           curWhRootPath = new Path(curWarehouseRoot);
@@ -339,18 +510,18 @@ public class HiveStrictManagedMigration {
             LOG.info("oldWarehouseRoot {} has a different FS than the current warehouse root {}."
                 + " Disabling shouldModifyManagedTableLocation",
                 runOptions.oldWarehouseRoot, curWarehouseRoot);
-            runOptions.shouldModifyManagedTableLocation = false;
+            shouldModifyManagedTableLocation = false;
           } else {
             if (!isHdfs(oldWhRootFs)) {
               LOG.info("Warehouse is using non-HDFS FileSystem {}. Disabling shouldModifyManagedTableLocation",
                   oldWhRootFs.getUri());
-              runOptions.shouldModifyManagedTableLocation = false;
+              shouldModifyManagedTableLocation = false;
             } else {
               encryptionShim = ShimLoader.getHadoopShims().createHdfsEncryptionShim(oldWhRootFs, conf);
               if (!hasEquivalentEncryption(encryptionShim, oldWhRootPath, curWhRootPath)) {
                 LOG.info("oldWarehouseRoot {} and current warehouse root {} have different encryption zones." +
                     " Disabling shouldModifyManagedTableLocation", oldWhRootPath, curWhRootPath);
-                runOptions.shouldModifyManagedTableLocation = false;
+                shouldModifyManagedTableLocation = false;
               }
             }
           }
@@ -358,14 +529,15 @@ public class HiveStrictManagedMigration {
       }
     }
 
-    if (runOptions.shouldModifyManagedTableLocation) {
-      Configuration oldWhConf = new Configuration(conf);
-      HiveConf.setVar(oldWhConf, HiveConf.ConfVars.METASTOREWAREHOUSE, runOptions.oldWarehouseRoot);
-      oldWh = new Warehouse(oldWhConf);
-    }
+    return new WarehouseRootCheckResult(shouldModifyManagedTableLocation, curWhRootPath, encryptionShim);
   }
 
-  void checkOwnerPermsOptions() {
+  static OwnerPermsOptions checkOwnerPermsOptions(RunOptions runOptions, HiveConf conf) {
+    String ownerName = null;
+    String groupName = null;
+    FsPermission dirPerms = null;
+    FsPermission filePerms = null;
+
     if (runOptions.shouldModifyManagedTableOwner) {
       ownerName = conf.get("strict.managed.tables.migration.owner", "hive");
       groupName = conf.get("strict.managed.tables.migration.group", null);
@@ -380,61 +552,72 @@ public class HiveStrictManagedMigration {
         filePerms = new FsPermission(filePermsString);
       }
     }
+
+    return new OwnerPermsOptions(ownerName, groupName, dirPerms, filePerms);
   }
 
-  void checkExternalWarehouseDir() {
+  static boolean checkExternalWarehouseDir(HiveConf conf) {
     String externalWarehouseDir = conf.getVar(HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL);
-    if (externalWarehouseDir != null && !externalWarehouseDir.isEmpty()) {
-      createExternalDirsForDbs = true;
-    }
+    return externalWarehouseDir != null && !externalWarehouseDir.isEmpty();
   }
 
-  void processDatabase(String dbName) throws IOException, HiveException, MetaException, TException {
-    LOG.info("Processing database {}", dbName);
-    Database dbObj = hms.getDatabase(dbName);
+  void processDatabase(String dbName, ForkJoinPool tablePool) {
+    try {
+      LOG.info("Processing database {}", dbName);
+      Database dbObj = hms.get().getDatabase(dbName);
 
-    boolean modifyDefaultManagedLocation = shouldModifyDatabaseLocation(dbObj);
-    if (modifyDefaultManagedLocation) {
-      Path newDefaultDbLocation = wh.getDefaultDatabasePath(dbName);
+      boolean modifyDefaultManagedLocation = shouldModifyDatabaseLocation(dbObj);
+      if (modifyDefaultManagedLocation) {
+        Path newDefaultDbLocation = wh.get().getDefaultDatabasePath(dbName);
 
-      LOG.info("Changing location of database {} to {}", dbName, newDefaultDbLocation);
-      if (!runOptions.dryRun) {
-        FileSystem fs = newDefaultDbLocation.getFileSystem(conf);
-        FileUtils.mkdir(fs, newDefaultDbLocation, conf);
-        // Set appropriate owner/perms of the DB dir only, no need to recurse
-        checkAndSetFileOwnerPermissions(fs, newDefaultDbLocation,
-            ownerName, groupName, dirPerms, null, runOptions.dryRun, false);
+        LOG.info("Changing location of database {} to {}", dbName, newDefaultDbLocation);
+        if (!runOptions.dryRun) {
+          FileSystem fs = newDefaultDbLocation.getFileSystem(conf);
+          FileUtils.mkdir(fs, newDefaultDbLocation, conf);
+          // Set appropriate owner/perms of the DB dir only, no need to recurse
+          checkAndSetFileOwnerPermissions(fs, newDefaultDbLocation,
+                  ownerName, groupName, dirPerms, null, runOptions.dryRun, false);
+        }
       }
-    }
 
-    if (createExternalDirsForDbs) {
-      createExternalDbDir(dbObj);
-    }
+      if (createExternalDirsForDbs) {
+        createExternalDbDir(dbObj);
+      }
 
-    boolean errorsInThisDb = false;
-    List<String> tableNames = hms.getTables(dbName, runOptions.tableRegex);
-    for (String tableName : tableNames) {
-      // If we did not change the DB location, there is no need to move the table directories.
-      try {
-        processTable(dbObj, tableName, modifyDefaultManagedLocation);
-      } catch (Exception err) {
-        LOG.error("Error processing table " + getQualifiedName(dbObj.getName(), tableName), err);
-        failuresEncountered = true;
-        errorsInThisDb = true;
+      List<String> tableNames;
+      if (runOptions.tableType == null) {
+        tableNames = hms.get().getTables(dbName, runOptions.tableRegex);
+        LOG.debug("found {} tables in {}", tableNames.size(), dbName);
+      } else {
+        tableNames = hms.get().getTables(dbName, runOptions.tableRegex, runOptions.tableType);
+        LOG.debug("found {} {}s in {}", tableNames.size(), runOptions.tableType.name(), dbName);
       }
-    }
 
-    // Finally update the DB location. This would prevent subsequent runs of the migration from processing this DB.
-    if (modifyDefaultManagedLocation) {
+      boolean errorsInThisDb = !tablePool.submit(() -> tableNames.parallelStream()
+              .map(tableName -> processTable(dbObj, tableName, modifyDefaultManagedLocation))
+              .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2)).get();
       if (errorsInThisDb) {
-        LOG.error("Not updating database location for {} since an error was encountered. The migration must be run again for this database.",
-                dbObj.getName());
-      } else {
-        Path newDefaultDbLocation = wh.getDefaultDatabasePath(dbName);
-        // dbObj after this call would have the new DB location.
-        // Keep that in mind if anything below this requires the old DB path.
-        getHiveUpdater().updateDbLocation(dbObj, newDefaultDbLocation);
+        failuresEncountered.set(true);
+      }
+
+      // Finally update the DB location. This would prevent subsequent runs of the migration from processing this DB.
+      if (modifyDefaultManagedLocation) {
+        if (errorsInThisDb) {
+          LOG.error("Not updating database location for {} since an error was encountered. " +
+                          "The migration must be run again for this database.", dbObj.getName());
+        } else {
+          Path newDefaultDbLocation = wh.get().getDefaultDatabasePath(dbName);
+          // dbObj after this call would have the new DB location.
+          // Keep that in mind if anything below this requires the old DB path.
+          hiveUpdater.get().updateDbLocation(dbObj, newDefaultDbLocation);
+        }
       }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.info("Cancel processing " + dbName, e);
+    } catch (TException | IOException | HiveException | ExecutionException ex) {
+      LOG.error("Error processing database " + dbName, ex);
+      failuresEncountered.set(true);
     }
   }
 
@@ -464,44 +647,55 @@ public class HiveStrictManagedMigration {
     return false;
   }
 
-  void processTable(Database dbObj, String tableName, boolean modifyDefaultManagedLocation)
-      throws HiveException, IOException, TException {
-    String dbName = dbObj.getName();
-    LOG.debug("Processing table {}", getQualifiedName(dbName, tableName));
+  boolean processTable(Database dbObj, String tableName, boolean modifyDefaultManagedLocation) {
+    try {
+      String dbName = dbObj.getName();
+      LOG.debug("Processing table {}", getQualifiedName(dbName, tableName));
 
-    Table tableObj = hms.getTable(dbName, tableName);
-    TableType tableType = TableType.valueOf(tableObj.getTableType());
+      Table tableObj = hms.get().getTable(dbName, tableName);
+      TableType tableType = TableType.valueOf(tableObj.getTableType());
 
-    TableMigrationOption migrationOption = runOptions.migrationOption;
-    if (migrationOption == TableMigrationOption.AUTOMATIC) {
-      migrationOption = determineMigrationTypeAutomatically(tableObj, tableType, ownerName, conf, hms, null);
-    }
+      TableMigrationOption migrationOption = runOptions.migrationOption;
+      if (migrationOption == TableMigrationOption.AUTOMATIC) {
+        migrationOption = determineMigrationTypeAutomatically(
+                tableObj, tableType, ownerName, conf, hms.get(), null);
+      }
 
-    failedValidationChecks = migrateTable(tableObj, tableType, migrationOption, runOptions.dryRun,
-            getHiveUpdater(), hms, conf);
+      boolean failedValidationCheck = migrateTable(tableObj, tableType, migrationOption, runOptions.dryRun,
+              hiveUpdater.get(), hms.get(), conf);
 
-    if (!failedValidationChecks && (TableType.valueOf(tableObj.getTableType()) == TableType.MANAGED_TABLE)) {
-      Path tablePath = new Path(tableObj.getSd().getLocation());
-      if (modifyDefaultManagedLocation && shouldModifyTableLocation(dbObj, tableObj)) {
-        Path newTablePath = wh.getDnsPath(
-            new Path(wh.getDefaultDatabasePath(dbName),
-                MetaStoreUtils.encodeTableName(tableName.toLowerCase())));
-        moveTableData(dbObj, tableObj, newTablePath);
-        if (!runOptions.dryRun) {
-          // File ownership/permission checks should be done on the new table path.
-          tablePath = newTablePath;
-        }
+      if (failedValidationCheck) {
+        this.failedValidationChecks.set(true);
+        return true;
       }
 
-      if (runOptions.shouldModifyManagedTableOwner || runOptions.shouldModifyManagedTablePermissions) {
-        FileSystem fs = tablePath.getFileSystem(conf);
-        if (isHdfs(fs)) {
-          // TODO: what about partitions not in the default location?
-          checkAndSetFileOwnerPermissions(fs, tablePath,
-              ownerName, groupName, dirPerms, filePerms, runOptions.dryRun, true);
+      if (TableType.valueOf(tableObj.getTableType()) == TableType.MANAGED_TABLE) {
+        Path tablePath = new Path(tableObj.getSd().getLocation());
+        if (modifyDefaultManagedLocation && shouldModifyTableLocation(dbObj, tableObj)) {
+          Path newTablePath = wh.get().getDnsPath(
+                  new Path(wh.get().getDefaultDatabasePath(dbName),
+                          MetaStoreUtils.encodeTableName(tableName.toLowerCase())));
+          moveTableData(dbObj, tableObj, newTablePath);
+          if (!runOptions.dryRun) {
+            // File ownership/permission checks should be done on the new table path.
+            tablePath = newTablePath;
+          }
+        }
+
+        if (runOptions.shouldModifyManagedTableOwner || runOptions.shouldModifyManagedTablePermissions) {
+          FileSystem fs = tablePath.getFileSystem(conf);
+          if (isHdfs(fs)) {
+            // TODO: what about partitions not in the default location?
+            checkAndSetFileOwnerPermissions(fs, tablePath,
+                    ownerName, groupName, dirPerms, filePerms, runOptions.dryRun, true);
+          }
         }
       }
+    } catch (Exception ex) {
+      LOG.error("Error processing table " + getQualifiedName(dbObj.getName(), tableName), ex);
+      return false;
     }
+    return true;
   }
 
   boolean shouldModifyDatabaseLocation(Database dbObj) throws IOException, MetaException {
@@ -510,7 +704,7 @@ public class HiveStrictManagedMigration {
       // Check if the database location is in the default location based on the old warehouse root.
       // If so then change the database location to the default based on the current warehouse root.
       String dbLocation = dbObj.getLocationUri();
-      Path oldDefaultDbLocation = oldWh.getDefaultDatabasePath(dbName);
+      Path oldDefaultDbLocation = oldWh.get().getDefaultDatabasePath(dbName);
       if (arePathsEqual(conf, dbLocation, oldDefaultDbLocation.toString())) {
         if (hasEquivalentEncryption(encryptionShim, oldDefaultDbLocation, curWhRootPath)) {
           return true;
@@ -529,7 +723,7 @@ public class HiveStrictManagedMigration {
     // If so then change the table location to the default based on the current warehouse root.
     // The existing table directory will also be moved to the new default database directory.
     String tableLocation = tableObj.getSd().getLocation();
-    Path oldDefaultTableLocation = oldWh.getDefaultTablePath(dbObj, tableObj.getTableName());
+    Path oldDefaultTableLocation = oldWh.get().getDefaultTablePath(dbObj, tableObj.getTableName());
     if (arePathsEqual(conf, tableLocation, oldDefaultTableLocation.toString())) {
       if (hasEquivalentEncryption(encryptionShim, oldDefaultTableLocation, curWhRootPath)) {
         return true;
@@ -545,7 +739,7 @@ public class HiveStrictManagedMigration {
       throws IOException, MetaException {
     String tableName = tableObj.getTableName();
     String partLocation = partObj.getSd().getLocation();
-    Path oldDefaultPartLocation = oldWh.getDefaultPartitionPath(dbObj, tableObj, partSpec);
+    Path oldDefaultPartLocation = oldWh.get().getDefaultPartitionPath(dbObj, tableObj, partSpec);
     if (arePathsEqual(conf, partLocation, oldDefaultPartLocation.toString())) {
       if (hasEquivalentEncryption(encryptionShim, oldDefaultPartLocation, curWhRootPath)) {
         return true;
@@ -558,7 +752,7 @@ public class HiveStrictManagedMigration {
   }
 
   void createExternalDbDir(Database dbObj) throws IOException, MetaException {
-    Path externalTableDbPath = wh.getDefaultExternalDatabasePath(dbObj.getName());
+    Path externalTableDbPath = wh.get().getDefaultExternalDatabasePath(dbObj.getName());
     FileSystem fs = externalTableDbPath.getFileSystem(conf);
     if (!fs.exists(externalTableDbPath)) {
       String dbOwner = ownerName;
@@ -621,19 +815,19 @@ public class HiveStrictManagedMigration {
     // locations to be in sync.
 
     if (isPartitionedTable(tableObj)) {
-      List<String> partNames = hms.listPartitionNames(dbName, tableName, Short.MAX_VALUE);
+      List<String> partNames = hms.get().listPartitionNames(dbName, tableName, Short.MAX_VALUE);
       // TODO: Fetch partitions in batches?
       // TODO: Threadpool to process partitions?
       for (String partName : partNames) {
-        Partition partObj = hms.getPartition(dbName, tableName, partName);
+        Partition partObj = hms.get().getPartition(dbName, tableName, partName);
         Map<String, String> partSpec =
             Warehouse.makeSpecFromValues(tableObj.getPartitionKeys(), partObj.getValues());
         if (shouldModifyPartitionLocation(dbObj, tableObj, partObj, partSpec)) {
           // Table directory (which includes the partition directory) has already been moved,
           // just update the partition location in the metastore.
           if (!runOptions.dryRun) {
-            Path newPartPath = wh.getPartitionPath(newTablePath, partSpec);
-            getHiveUpdater().updatePartitionLocation(dbName, tableObj, partName, partObj, newPartPath);
+            Path newPartPath = wh.get().getPartitionPath(newTablePath, partSpec);
+            hiveUpdater.get().updatePartitionLocation(dbName, tableObj, partName, partObj, newPartPath);
           }
         }
       }
@@ -642,7 +836,7 @@ public class HiveStrictManagedMigration {
     // Finally update the table location. This would prevent this tool from processing this table again
     // on subsequent runs of the migration.
     if (!runOptions.dryRun) {
-      getHiveUpdater().updateTableLocation(tableObj, newTablePath);
+      hiveUpdater.get().updateTableLocation(tableObj, newTablePath);
     }
   }
 
@@ -907,6 +1101,7 @@ public class HiveStrictManagedMigration {
   }
 
   void cleanup() {
+    hms.close();
     if (hiveUpdater != null) {
       runAndLogErrors(() -> hiveUpdater.close());
       hiveUpdater = null;
@@ -917,13 +1112,6 @@ public class HiveStrictManagedMigration {
     return new HiveUpdater(conf, false);
   }
 
-  HiveUpdater getHiveUpdater() throws HiveException {
-    if (hiveUpdater == null) {
-      hiveUpdater = new HiveUpdater(conf, true);
-    }
-    return hiveUpdater;
-  }
-
   private static final class TxnCtx {
     public final long writeId;
     public final String validWriteIds;
@@ -936,7 +1124,7 @@ public class HiveStrictManagedMigration {
     }
   }
 
-  public static class HiveUpdater {
+  private static class HiveUpdater implements AutoCloseable {
     Hive hive;
     boolean doFileRename;
 
@@ -946,9 +1134,10 @@ public class HiveStrictManagedMigration {
       doFileRename = fileRename;
     }
 
-    void close() {
+    @Override
+    public void close() {
       if (hive != null) {
-        runAndLogErrors(() -> Hive.closeCurrent());
+        runAndLogErrors(Hive::closeCurrent);
         hive = null;
       }
     }
@@ -1122,8 +1311,6 @@ public class HiveStrictManagedMigration {
     }
   }
 
-  HiveUpdater hiveUpdater;
-
   interface ThrowableRunnable {
     void run() throws Exception;
   }
@@ -1157,7 +1344,7 @@ public class HiveStrictManagedMigration {
   }
 
   static boolean isHdfs(FileSystem fs) {
-    return fs.getScheme().equals("hdfs");
+    return scheme.equals(fs.getScheme());
   }
 
   static String getQualifiedName(Table tableObj) {
@@ -1335,4 +1522,12 @@ public class HiveStrictManagedMigration {
     }
     return true;
   }
+
+  /**
+   * can set it from tests to test when config needs something other than default values.
+   */
+  @VisibleForTesting
+  static HiveConf hiveConf = null;
+  @VisibleForTesting
+  static String scheme = "hdfs";
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java
new file mode 100644
index 0000000..5b6eecc
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.util;
+
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
+
+/**
+ * This class allows specifying a prefix for ForkJoinPool thread names.
+ */
+public class NamedForkJoinWorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
+
+  NamedForkJoinWorkerThreadFactory(String namePrefix) {
+    this.namePrefix = namePrefix;
+  }
+
+  private final String namePrefix;
+
+  @Override
+  public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
+    ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+    worker.setName(namePrefix + worker.getName());
+    return worker;
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 5f39fdc..7039b89 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -17,6 +17,13 @@
  */
 package org.apache.hadoop.hive.ql;
 
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -40,13 +47,6 @@ import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 public abstract class TxnCommandsBaseForTests {
   private static final Logger LOG = LoggerFactory.getLogger(TxnCommandsBaseForTests.class);
   //bucket count for test tables; set it to 1 for easier debugging
@@ -55,7 +55,7 @@ public abstract class TxnCommandsBaseForTests {
   public TestName testName = new TestName();
   protected HiveConf hiveConf;
   Driver d;
-  enum Table {
+  public enum Table {
     ACIDTBL("acidTbl"),
     ACIDTBLPART("acidTblPart"),
     ACIDTBL2("acidTbl2"),
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/util/CloseableThreadLocalTest.java b/ql/src/test/org/apache/hadoop/hive/ql/util/CloseableThreadLocalTest.java
new file mode 100644
index 0000000..25a59b7
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/util/CloseableThreadLocalTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.util;
+
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+public class CloseableThreadLocalTest {
+
+  private static class AutoCloseableStub implements AutoCloseable {
+
+    private boolean closed = false;
+
+    public boolean isClosed() {
+      return closed;
+    }
+
+    @Override
+    public void close() throws Exception {
+      closed = true;
+    }
+  }
+
+  @Test
+  public void testResourcesAreInitiallyNotClosed() {
+    CloseableThreadLocal<AutoCloseableStub> closeableThreadLocal =
+            new CloseableThreadLocal<>(AutoCloseableStub::new, 1);
+
+    assertThat(closeableThreadLocal.get().isClosed(), is(false));
+  }
+
+  @Test
+  public void testAfterCallingCloseAllInstancesAreClosed() throws ExecutionException, InterruptedException {
+    CloseableThreadLocal<AutoCloseableStub> closeableThreadLocal =
+            new CloseableThreadLocal<>(AutoCloseableStub::new, 2);
+
+    AutoCloseableStub asyncInstance = CompletableFuture.supplyAsync(closeableThreadLocal::get).get();
+    AutoCloseableStub syncInstance = closeableThreadLocal.get();
+
+    closeableThreadLocal.close();
+
+    assertThat(asyncInstance.isClosed(), is(true));
+    assertThat(syncInstance.isClosed(), is(true));
+  }
+
+  @Test
+  public void testSubsequentGetsInTheSameThreadGivesBackTheSameObject() {
+    CloseableThreadLocal<AutoCloseableStub> closeableThreadLocal =
+            new CloseableThreadLocal<>(AutoCloseableStub::new, 2);
+
+    AutoCloseableStub ref1 = closeableThreadLocal.get();
+    AutoCloseableStub ref2 = closeableThreadLocal.get();
+    assertThat(ref1, is(ref2));
+  }
+
+  @Test
+  public void testDifferentThreadsHasDifferentInstancesOfTheResource() throws ExecutionException, InterruptedException {
+    CloseableThreadLocal<AutoCloseableStub> closeableThreadLocal =
+            new CloseableThreadLocal<>(AutoCloseableStub::new, 2);
+
+    AutoCloseableStub asyncInstance = CompletableFuture.supplyAsync(closeableThreadLocal::get).get();
+    AutoCloseableStub syncInstance = closeableThreadLocal.get();
+    assertThat(asyncInstance, is(not(syncInstance)));
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/util/TestHiveStrictManagedMigration.java b/ql/src/test/org/apache/hadoop/hive/ql/util/TestHiveStrictManagedMigration.java
new file mode 100644
index 0000000..057135b
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/util/TestHiveStrictManagedMigration.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.util;
+
+import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.ACIDTBL;
+import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.ACIDTBLPART;
+import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.NONACIDNONBUCKET;
+import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.NONACIDORCTBL;
+import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.NONACIDORCTBL2;
+
+import java.io.File;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.TxnCommandsBaseForTests;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHiveStrictManagedMigration extends TxnCommandsBaseForTests {
+  private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+  File.separator + TestHiveStrictManagedMigration.class.getCanonicalName() + "-" + System.currentTimeMillis()
+          ).getPath().replaceAll("\\\\", "/");
+
+  @Test
+  public void testUpgrade() throws Exception {
+    int[][] data = {{1, 2}, {3, 4}, {5, 6}};
+    runStatementOnDriver("DROP TABLE IF EXISTS test.TAcid");
+    runStatementOnDriver("DROP DATABASE IF EXISTS test");
+
+    runStatementOnDriver("CREATE DATABASE test");
+    runStatementOnDriver(
+            "CREATE TABLE test.TAcid (a int, b int) CLUSTERED BY (b) INTO 2 BUCKETS STORED AS orc TBLPROPERTIES" +
+                    " ('transactional'='true')");
+    runStatementOnDriver("INSERT INTO test.TAcid" + makeValuesClause(data));
+
+    runStatementOnDriver(
+            "CREATE EXTERNAL TABLE texternal (a int, b int)");
+
+    String oldWarehouse = getWarehouseDir();
+    String[] args = {"--hiveconf", "hive.strict.managed.tables=true", "-m",  "automatic", "--modifyManagedTables",
+            "--oldWarehouseRoot", oldWarehouse};
+    HiveConf newConf = new HiveConf(hiveConf);
+    File newWarehouseDir = new File(getTestDataDir(), "newWarehouse");
+    newConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, newWarehouseDir.getAbsolutePath());
+    newConf.set("strict.managed.tables.migration.owner", System.getProperty("user.name"));
+    HiveStrictManagedMigration.hiveConf = newConf;
+    HiveStrictManagedMigration.scheme = "file";
+    HiveStrictManagedMigration.main(args);
+
+    Assert.assertTrue(newWarehouseDir.exists());
+    Assert.assertTrue(new File(newWarehouseDir, ACIDTBL.toString().toLowerCase()).exists());
+    Assert.assertTrue(new File(newWarehouseDir, ACIDTBLPART.toString().toLowerCase()).exists());
+    Assert.assertTrue(new File(newWarehouseDir, NONACIDNONBUCKET.toString().toLowerCase()).exists());
+    Assert.assertTrue(new File(newWarehouseDir, NONACIDORCTBL.toString().toLowerCase()).exists());
+    Assert.assertTrue(new File(newWarehouseDir, NONACIDORCTBL2.toString().toLowerCase()).exists());
+    Assert.assertTrue(new File(new File(newWarehouseDir, "test.db"), "tacid").exists());
+    Assert.assertTrue(new File(oldWarehouse, "texternal").exists());
+  }
+
+  @Override
+  protected String getTestDataDir() {
+    return TEST_DATA_DIR;
+  }
+}