You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2019/07/12 04:10:57 UTC

[hive] branch master updated: HIVE-21948: Implement parallel processing in Pre Upgrade Tool (Krisztian Kasa, reviewed by Jesus Camacho Rodriguez)

This is an automated email from the ASF dual-hosted git repository.

jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d9f1a0  HIVE-21948: Implement parallel processing in Pre Upgrade Tool (Krisztian Kasa, reviewed by Jesus Camacho Rodriguez)
9d9f1a0 is described below

commit 9d9f1a087303d4a166ab05bf26c276b5137b5cc9
Author: Krisztian Kasa <kk...@cloudera.com>
AuthorDate: Thu Jul 11 21:09:33 2019 -0700

    HIVE-21948: Implement parallel processing in Pre Upgrade Tool (Krisztian Kasa, reviewed by Jesus Camacho Rodriguez)
---
 .../hive/upgrade/acid/CloseableThreadLocal.java    |  61 ++++
 .../hive/upgrade/acid/CompactTablesState.java      |  59 ++++
 .../hive/upgrade/acid/CompactionMetaInfo.java      |  68 +++++
 .../acid/NamedForkJoinWorkerThreadFactory.java     |  40 +++
 .../hadoop/hive/upgrade/acid/PreUpgradeTool.java   | 316 ++++++++++++---------
 .../hadoop/hive/upgrade/acid/RunOptions.java       |  33 ++-
 .../upgrade/acid/TestCloseableThreadLocal.java     |  86 ++++++
 .../hadoop/hive/upgrade/acid/TestRunOptions.java   |  67 +++++
 8 files changed, 596 insertions(+), 134 deletions(-)

diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocal.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocal.java
new file mode 100644
index 0000000..fbe0a80
--- /dev/null
+++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocal.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class has similar functionality as java.lang.ThreadLocal.
+ * Plus it provides a close function to clean up unmanaged resources in all threads where the resource was initialized.
+ * @param <T> - type of resource
+ */
+public class CloseableThreadLocal<T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CloseableThreadLocal.class);
+
+  private final ConcurrentHashMap<Thread, T> threadLocalMap;
+  private final Supplier<T> initialValue;
+  private final Consumer<T> closeFunction;
+
+  public CloseableThreadLocal(Supplier<T> initialValue, Consumer<T> closeFunction, int poolSize) {
+    this.initialValue = initialValue;
+    threadLocalMap = new ConcurrentHashMap<>(poolSize);
+    this.closeFunction = closeFunction;
+  }
+
+  public T get() {
+    return threadLocalMap.computeIfAbsent(Thread.currentThread(), thread -> initialValue.get());
+  }
+
+  public void close() {
+    threadLocalMap.values().forEach(this::closeQuietly);
+  }
+
+  private void closeQuietly(T resource) {
+    try {
+      closeFunction.accept(resource);
+    } catch (Exception e) {
+      LOG.warn("Error while closing resource.", e);
+    }
+  }
+}
diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactTablesState.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactTablesState.java
new file mode 100644
index 0000000..beb934c
--- /dev/null
+++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactTablesState.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import static java.util.Collections.emptyList;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Store result of database and table scan: compaction commands and meta info.
+ */
+public final class CompactTablesState {
+
+  public static CompactTablesState empty() {
+    return new CompactTablesState(emptyList(), new CompactionMetaInfo());
+  }
+
+  public static CompactTablesState compactions(List<String> compactionCommands, CompactionMetaInfo compactionMetaInfo) {
+    return new CompactTablesState(compactionCommands, compactionMetaInfo);
+  }
+
+  private final List<String> compactionCommands;
+  private final CompactionMetaInfo compactionMetaInfo;
+
+  private CompactTablesState(List<String> compactionCommands, CompactionMetaInfo compactionMetaInfo) {
+    this.compactionCommands = compactionCommands;
+    this.compactionMetaInfo = compactionMetaInfo;
+  }
+
+  public List<String> getCompactionCommands() {
+    return compactionCommands;
+  }
+
+  public CompactionMetaInfo getMetaInfo() {
+    return compactionMetaInfo;
+  }
+
+  public CompactTablesState merge(CompactTablesState other) {
+    List<String> compactionCommands = new ArrayList<>(this.compactionCommands);
+    compactionCommands.addAll(other.compactionCommands);
+    return new CompactTablesState(compactionCommands, this.compactionMetaInfo.merge(other.compactionMetaInfo));
+  }
+}
diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactionMetaInfo.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactionMetaInfo.java
new file mode 100644
index 0000000..72b4ec6
--- /dev/null
+++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactionMetaInfo.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Store result of compaction calls.
+ */
+public class CompactionMetaInfo {
+  /**
+   * total number of bytes to be compacted across all compaction commands.
+   */
+  private long numberOfBytes;
+  /**
+   * IDs of compactions launched by this utility.
+   */
+  private final Set<Long> compactionIds;
+
+  public CompactionMetaInfo() {
+    compactionIds = new HashSet<>();
+    numberOfBytes = 0;
+  }
+
+  private CompactionMetaInfo(Set<Long> initialCompactionIds, long initialNumberOfBytes) {
+    this.compactionIds = new HashSet<>(initialCompactionIds);
+    numberOfBytes = initialNumberOfBytes;
+  }
+
+  public CompactionMetaInfo merge(CompactionMetaInfo other) {
+    CompactionMetaInfo result = new CompactionMetaInfo(this.compactionIds, this.numberOfBytes);
+    result.numberOfBytes += other.numberOfBytes;
+    result.compactionIds.addAll(other.compactionIds);
+    return result;
+  }
+
+  public long getNumberOfBytes() {
+    return numberOfBytes;
+  }
+
+  public void addBytes(long bytes) {
+    numberOfBytes += bytes;
+  }
+
+  public Set<Long> getCompactionIds() {
+    return compactionIds;
+  }
+
+  public void addCompactionId(long compactionId) {
+    compactionIds.add(compactionId);
+  }
+}
diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/NamedForkJoinWorkerThreadFactory.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/NamedForkJoinWorkerThreadFactory.java
new file mode 100644
index 0000000..2b95f7b
--- /dev/null
+++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/NamedForkJoinWorkerThreadFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
+
+/**
+ * This class allows specifying a prefix for ForkJoinPool thread names.
+ */
+public class NamedForkJoinWorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
+
+  NamedForkJoinWorkerThreadFactory(String namePrefix) {
+    this.namePrefix = namePrefix;
+  }
+
+  private final String namePrefix;
+
+  @Override
+  public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
+    ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+    worker.setName(namePrefix + worker.getName());
+    return worker;
+  }
+}
diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java
index 0a7354d..5b0ad7c 100644
--- a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java
+++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java
@@ -29,9 +29,8 @@ import java.nio.charset.CharsetDecoder;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 import java.util.stream.Collectors;
 
 import org.apache.commons.cli.CommandLine;
@@ -57,6 +56,7 @@ import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.CompactionResponse;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
@@ -68,8 +68,8 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.security.AccessControlException;
@@ -116,26 +116,24 @@ import com.google.common.annotations.VisibleForTesting;
  *
  * See also org.apache.hadoop.hive.ql.util.UpgradeTool in Hive 3.x
  */
-public class PreUpgradeTool {
+public class PreUpgradeTool implements AutoCloseable {
   private static final Logger LOG = LoggerFactory.getLogger(PreUpgradeTool.class);
   private static final int PARTITION_BATCH_SIZE = 10000;
-  private final Options cmdLineOptions = new Options();
 
   public static void main(String[] args) throws Exception {
-    PreUpgradeTool tool = new PreUpgradeTool();
-    tool.init();
+    Options cmdLineOptions = createCommandLineOptions();
     CommandLineParser parser = new GnuParser();
     CommandLine line ;
     try {
-      line = parser.parse(tool.cmdLineOptions, args);
+      line = parser.parse(cmdLineOptions, args);
     } catch (ParseException e) {
       System.err.println("PreUpgradeTool: Parsing failed.  Reason: " + e.getLocalizedMessage());
-      printAndExit(tool);
+      printAndExit(cmdLineOptions);
       return;
     }
     if (line.hasOption("help")) {
       HelpFormatter formatter = new HelpFormatter();
-      formatter.printHelp("upgrade-acid", tool.cmdLineOptions);
+      formatter.printHelp("upgrade-acid", cmdLineOptions);
       return;
     }
     RunOptions runOptions = RunOptions.fromCommandLine(line);
@@ -144,25 +142,53 @@ public class PreUpgradeTool {
     try {
       String hiveVer = HiveVersionInfo.getShortVersion();
       LOG.info("Using Hive Version: " + HiveVersionInfo.getVersion() + " build: " +
-          HiveVersionInfo.getBuildVersion());
+              HiveVersionInfo.getBuildVersion());
       if(!hiveVer.startsWith("2.")) {
         throw new IllegalStateException("preUpgrade requires Hive 2.x.  Actual: " + hiveVer);
       }
-      tool.prepareAcidUpgradeInternal(runOptions);
+      try (PreUpgradeTool tool = new PreUpgradeTool(runOptions)) {
+        tool.prepareAcidUpgradeInternal();
+      }
     }
     catch(Exception ex) {
       LOG.error("PreUpgradeTool failed", ex);
       throw ex;
     }
   }
-  private static void printAndExit(PreUpgradeTool tool) {
+
+  private final HiveConf conf;
+  private final CloseableThreadLocal<IMetaStoreClient> metaStoreClient;
+  private final ThreadLocal<ValidTxnList> txns;
+  private final RunOptions runOptions;
+
+  public PreUpgradeTool(RunOptions runOptions) {
+    this.runOptions = runOptions;
+    this.conf = hiveConf != null ? hiveConf : new HiveConf();
+    this.metaStoreClient = new CloseableThreadLocal<>(this::getHMS, IMetaStoreClient::close,
+            runOptions.getTablePoolSize());
+    this.txns = ThreadLocal.withInitial(() -> {
+      /*
+       This API changed from 2.x to 3.0.  so this won't even compile with 3.0
+       but it doesn't need to since we only run this preUpgrade
+      */
+      try {
+        TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+        return TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
+      } catch (MetaException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  private static void printAndExit(Options cmdLineOptions) {
     HelpFormatter formatter = new HelpFormatter();
-    formatter.printHelp("upgrade-acid", tool.cmdLineOptions);
+    formatter.printHelp("upgrade-acid", cmdLineOptions);
     System.exit(1);
   }
 
-  private void init() {
+  static Options createCommandLineOptions() {
     try {
+      Options cmdLineOptions = new Options();
       cmdLineOptions.addOption(new Option("help", "Generates a script to execute on 2.x" +
           " cluster.  This requires 2.x binaries on the classpath and hive-site.xml."));
       Option exec = new Option("execute",
@@ -196,23 +222,32 @@ public class PreUpgradeTool {
       tableTypeOption.setArgs(1);
       tableTypeOption.setArgName("table type");
       cmdLineOptions.addOption(tableTypeOption);
+
+      Option tablePoolSizeOption = new Option("tn", "Number of threads to process tables.");
+      tablePoolSizeOption.setLongOpt("tablePoolSize");
+      tablePoolSizeOption.setArgs(1);
+      tablePoolSizeOption.setArgName("pool size");
+      cmdLineOptions.addOption(tablePoolSizeOption);
+
+      return cmdLineOptions;
     }
     catch(Exception ex) {
       LOG.error("init()", ex);
       throw ex;
     }
   }
+
   private static HiveMetaHookLoader getHookLoader() {
     return new HiveMetaHookLoader() {
       @Override
       public HiveMetaHook getHook(
-          org.apache.hadoop.hive.metastore.api.Table tbl) {
+              org.apache.hadoop.hive.metastore.api.Table tbl) {
         return null;
       }
     };
   }
 
-  private static IMetaStoreClient getHMS(HiveConf conf) {
+  public IMetaStoreClient getHMS() {
     UserGroupInformation loggedInUser = null;
     try {
       loggedInUser = UserGroupInformation.getLoginUser();
@@ -229,90 +264,51 @@ public class PreUpgradeTool {
       which calls HiveMetaStoreClient(HiveConf, Boolean) which exists in
        (at least) 2.1.0.2.6.5.0-292 and later but not in 2.1.0.2.6.0.3-8 (the HDP 2.6 release)
        i.e. RetryingMetaStoreClient.getProxy(conf, true) is broken in 2.6.0*/
-      return RetryingMetaStoreClient.getProxy(conf,
-          new Class[]{HiveConf.class, HiveMetaHookLoader.class, Boolean.class},
-          new Object[]{conf, getHookLoader(), Boolean.TRUE}, null, HiveMetaStoreClient.class.getName());
-    } catch (MetaException e) {
+      IMetaStoreClient client = RetryingMetaStoreClient.getProxy(conf,
+              new Class[]{HiveConf.class, HiveMetaHookLoader.class, Boolean.class},
+              new Object[]{conf, getHookLoader(), Boolean.TRUE}, null, HiveMetaStoreClient.class.getName());
+      if (hiveConf != null) {
+        SessionState ss = SessionState.start(conf);
+        ss.applyAuthorizationPolicy();
+      }
+      return client;
+    } catch (MetaException | HiveException e) {
       throw new RuntimeException("Error connecting to Hive Metastore URI: "
-          + conf.getVar(HiveConf.ConfVars.METASTOREURIS) + ". " + e.getMessage(), e);
+              + conf.getVar(HiveConf.ConfVars.METASTOREURIS) + ". " + e.getMessage(), e);
     }
   }
 
   /**
    * todo: change script comments to a preamble instead of a footer
    */
-  private void prepareAcidUpgradeInternal(RunOptions runOptions)
+  private void prepareAcidUpgradeInternal()
       throws HiveException, TException, IOException {
-    HiveConf conf = hiveConf != null ? hiveConf : new HiveConf();
-    boolean isAcidEnabled = isAcidEnabled(conf);
-    IMetaStoreClient hms = getHMS(conf);
+    if (!isAcidEnabled(conf)) {
+      LOG.info("acid is off, there can't be any acid tables - nothing to compact");
+      return;
+    }
+    IMetaStoreClient hms = metaStoreClient.get();
     LOG.debug("Looking for databases");
     String exceptionMsg = null;
     List<String> databases;
-    List<String> compactions = new ArrayList<>();
-    final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo();
-    ValidTxnList txns = null;
-    Hive db = null;
+    CompactTablesState compactTablesState;
     try {
       databases = hms.getDatabases(runOptions.getDbRegex()); //TException
       LOG.debug("Found " + databases.size() + " databases to process");
-      if (runOptions.isExecute()) {
-        db = Hive.get(conf);
-      }
 
-      for (String dbName : databases) {
-        try {
-          List<String> tables;
-          if (runOptions.getTableType() == null) {
-            tables = hms.getTables(dbName, runOptions.getTableRegex());
-            LOG.debug("found {} tables in {}", tables.size(), dbName);
-          } else {
-            tables = hms.getTables(dbName, runOptions.getTableRegex(), runOptions.getTableType());
-            LOG.debug("found {} {} in {}", tables.size(), runOptions.getTableType().name(), dbName);
-          }
-          for (String tableName : tables) {
-            try {
-              Table t = hms.getTable(dbName, tableName);
-              LOG.debug("processing table " + Warehouse.getQualifiedName(t));
-              if (isAcidEnabled) {
-                //if acid is off, there can't be any acid tables - nothing to compact
-                if (txns == null) {
-          /*
-           This API changed from 2.x to 3.0.  so this won't even compile with 3.0
-           but it doesn't need to since we only run this preUpgrade
-          */
-                  TxnStore txnHandler = TxnUtils.getTxnStore(conf);
-                  txns = TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
-                }
-                List<String> compactionCommands =
-                        getCompactionCommands(t, conf, hms, compactionMetaInfo, runOptions.isExecute(), db, txns);
-                compactions.addAll(compactionCommands);
-              }
-              /*todo: handle renaming files somewhere*/
-            } catch (Exception e) {
-              if (isAccessControlException(e)) {
-                // this could be external table with 0 permission for hive user
-                exceptionMsg = "Unable to access " + dbName + "." + tableName + ". Pre-upgrade tool requires read-access " +
-                  "to databases and tables to determine if a table has to be compacted. " +
-                  "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " +
-                  "false to allow read-access to databases and tables and retry the pre-upgrade tool again..";
-              }
-              throw e;
-            }
-          }
-        } catch (Exception e) {
-          if (exceptionMsg == null && isAccessControlException(e)) {
-            // we may not have access to read all tables from this db
-            exceptionMsg = "Unable to access " + dbName + ". Pre-upgrade tool requires read-access " +
-              "to databases and tables to determine if a table has to be compacted. " +
-              "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " +
-              "false to allow read-access to databases and tables and retry the pre-upgrade tool again..";
-          }
-          throw e;
-        }
-      }
+      ForkJoinPool processTablePool = new ForkJoinPool(
+              runOptions.getTablePoolSize(),
+              new NamedForkJoinWorkerThreadFactory("Table-"),
+              getUncaughtExceptionHandler(),
+              false
+              );
+      compactTablesState = databases.stream()
+                      .map(dbName -> processDatabase(dbName, processTablePool, runOptions))
+                      .reduce(CompactTablesState::merge)
+              .orElse(CompactTablesState.empty());
+
     } catch (Exception e) {
-      if (exceptionMsg == null && isAccessControlException(e)) {
+      if (isAccessControlException(e)) {
         exceptionMsg = "Unable to get databases. Pre-upgrade tool requires read-access " +
           "to databases and tables to determine if a table has to be compacted. " +
           "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " +
@@ -321,27 +317,27 @@ public class PreUpgradeTool {
       throw new HiveException(exceptionMsg, e);
     }
 
-    makeCompactionScript(compactions, runOptions.getOutputDir(), compactionMetaInfo);
+    makeCompactionScript(compactTablesState, runOptions.getOutputDir());
 
     if(runOptions.isExecute()) {
-      while(compactionMetaInfo.compactionIds.size() > 0) {
-        LOG.debug("Will wait for " + compactionMetaInfo.compactionIds.size() +
+      while(compactTablesState.getMetaInfo().getCompactionIds().size() > 0) {
+        LOG.debug("Will wait for " + compactTablesState.getMetaInfo().getCompactionIds().size() +
             " compactions to complete");
-        ShowCompactResponse resp = db.showCompactions();
+        ShowCompactResponse resp = hms.showCompactions();
         for(ShowCompactResponseElement e : resp.getCompacts()) {
           final String state = e.getState();
           boolean removed;
           switch (state) {
             case TxnStore.CLEANING_RESPONSE:
             case TxnStore.SUCCEEDED_RESPONSE:
-              removed = compactionMetaInfo.compactionIds.remove(e.getId());
+              removed = compactTablesState.getMetaInfo().getCompactionIds().remove(e.getId());
               if(removed) {
                 LOG.debug("Required compaction succeeded: " + e.toString());
               }
               break;
             case TxnStore.ATTEMPTED_RESPONSE:
             case TxnStore.FAILED_RESPONSE:
-              removed = compactionMetaInfo.compactionIds.remove(e.getId());
+              removed = compactTablesState.getMetaInfo().getCompactionIds().remove(e.getId());
               if(removed) {
                 LOG.warn("Required compaction failed: " + e.toString());
               }
@@ -357,7 +353,7 @@ public class PreUpgradeTool {
               LOG.error("Unexpected state for : " + e.toString());
           }
         }
-        if(compactionMetaInfo.compactionIds.size() > 0) {
+        if(compactTablesState.getMetaInfo().getCompactionIds().size() > 0) {
           try {
             if (callback != null) {
               callback.onWaitForCompaction();
@@ -371,6 +367,67 @@ public class PreUpgradeTool {
     }
   }
 
+  private Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
+    return (t, e) -> LOG.error(String.format("Thread %s exited with error", t.getName()), e);
+  }
+
+  private CompactTablesState processDatabase(
+          String dbName, ForkJoinPool threadPool, RunOptions runOptions) {
+    try {
+      IMetaStoreClient hms = metaStoreClient.get();
+
+      List<String> tables;
+      if (runOptions.getTableType() == null) {
+        tables = hms.getTables(dbName, runOptions.getTableRegex());
+        LOG.debug("found {} tables in {}", tables.size(), dbName);
+      }
+      else {
+        tables = hms.getTables(dbName, runOptions.getTableRegex(), runOptions.getTableType());
+        LOG.debug("found {} {} in {}", tables.size(), runOptions.getTableType().name(), dbName);
+      }
+
+      return threadPool.submit(
+              () -> tables.parallelStream()
+                      .map(table -> processTable(dbName, table, runOptions))
+                      .reduce(CompactTablesState::merge)).get()
+              .orElse(CompactTablesState.empty());
+    } catch (Exception e) {
+      if (isAccessControlException(e)) {
+        // we may not have access to read all tables from this db
+        throw new RuntimeException("Unable to access " + dbName + ". Pre-upgrade tool requires read-access " +
+                "to databases and tables to determine if a table has to be compacted. " +
+                "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " +
+                "false to allow read-access to databases and tables and retry the pre-upgrade tool again..", e);
+      }
+      throw new RuntimeException(e);
+    }
+  }
+
+  private CompactTablesState processTable(
+          String dbName, String tableName, RunOptions runOptions) {
+    try {
+      IMetaStoreClient hms = metaStoreClient.get();
+      final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo();
+
+      Table t = hms.getTable(dbName, tableName);
+      LOG.debug("processing table " + Warehouse.getQualifiedName(t));
+      List<String> compactionCommands =
+              getCompactionCommands(t, conf, hms, compactionMetaInfo, runOptions.isExecute(), txns.get());
+      return CompactTablesState.compactions(compactionCommands, compactionMetaInfo);
+      /*todo: handle renaming files somewhere*/
+    } catch (Exception e) {
+      if (isAccessControlException(e)) {
+        // this could be external table with 0 permission for hive user
+        throw new RuntimeException(
+                "Unable to access " + dbName + "." + tableName + ". Pre-upgrade tool requires read-access " +
+                "to databases and tables to determine if a table has to be compacted. " +
+                "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " +
+                "false to allow read-access to databases and tables and retry the pre-upgrade tool again..", e);
+      }
+      throw new RuntimeException(e);
+    }
+  }
+
   private boolean isAccessControlException(final Exception e) {
     // hadoop security AccessControlException
     if ((e instanceof MetaException && e.getCause() instanceof AccessControlException) ||
@@ -391,25 +448,25 @@ public class PreUpgradeTool {
   /**
    * Generates a set compaction commands to run on pre Hive 3 cluster
    */
-  private static void makeCompactionScript(List<String> commands, String scriptLocation,
-      CompactionMetaInfo compactionMetaInfo) throws IOException {
-    if (commands.isEmpty()) {
+  private static void makeCompactionScript(CompactTablesState result, String scriptLocation) throws IOException {
+    if (result.getCompactionCommands().isEmpty()) {
       LOG.info("No compaction is necessary");
       return;
     }
     String fileName = "compacts_" + System.currentTimeMillis() + ".sql";
     LOG.debug("Writing compaction commands to " + fileName);
-    try(PrintWriter pw = createScript(commands, fileName, scriptLocation)) {
+    try(PrintWriter pw = createScript(
+            result.getCompactionCommands(), fileName, scriptLocation)) {
       //add post script
-      pw.println("-- Generated total of " + commands.size() + " compaction commands");
-      if(compactionMetaInfo.numberOfBytes < Math.pow(2, 20)) {
+      pw.println("-- Generated total of " + result.getCompactionCommands().size() + " compaction commands");
+      if(result.getMetaInfo().getNumberOfBytes() < Math.pow(2, 20)) {
         //to see it working in UTs
         pw.println("-- The total volume of data to be compacted is " +
-            String.format("%.6fMB", compactionMetaInfo.numberOfBytes/Math.pow(2, 20)));
+            String.format("%.6fMB", result.getMetaInfo().getNumberOfBytes()/Math.pow(2, 20)));
       }
       else {
         pw.println("-- The total volume of data to be compacted is " +
-            String.format("%.3fGB", compactionMetaInfo.numberOfBytes/Math.pow(2, 30)));
+            String.format("%.3fGB", result.getMetaInfo().getNumberOfBytes()/Math.pow(2, 30)));
       }
       pw.println();
       //todo: should be at the top of the file...
@@ -438,7 +495,7 @@ public class PreUpgradeTool {
    * @return any compaction commands to run for {@code Table t}
    */
   private static List<String> getCompactionCommands(Table t, HiveConf conf,
-      IMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo, boolean execute, Hive db,
+      IMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo, boolean execute,
       ValidTxnList txns) throws IOException, TException, HiveException {
     if(!isFullAcidTable(t)) {
       return Collections.emptyList();
@@ -452,7 +509,7 @@ public class PreUpgradeTool {
       List<String> cmds = new ArrayList<>();
       cmds.add(getCompactionCommand(t, null));
       if(execute) {
-        scheduleCompaction(t, null, db, compactionMetaInfo);
+        scheduleCompaction(t, null, hms, compactionMetaInfo);
       }
       return cmds;
     }
@@ -463,19 +520,19 @@ public class PreUpgradeTool {
     for(int i = 0; i < numWholeBatches; i++) {
       List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
           partNames.subList(i * batchSize, (i + 1) * batchSize));
-      getCompactionCommands(t, partitionList, db, execute, compactionCommands,
+      getCompactionCommands(t, partitionList, hms, execute, compactionCommands,
           compactionMetaInfo, conf, txns);
     }
     if(numWholeBatches * batchSize < partNames.size()) {
       //last partial batch
       List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
           partNames.subList(numWholeBatches * batchSize, partNames.size()));
-      getCompactionCommands(t, partitionList, db, execute, compactionCommands,
+      getCompactionCommands(t, partitionList, hms, execute, compactionCommands,
           compactionMetaInfo, conf, txns);
     }
     return compactionCommands;
   }
-  private static void getCompactionCommands(Table t, List<Partition> partitionList, Hive db,
+  private static void getCompactionCommands(Table t, List<Partition> partitionList, IMetaStoreClient hms,
       boolean execute, List<String> compactionCommands, CompactionMetaInfo compactionMetaInfo,
       HiveConf conf, ValidTxnList txns)
       throws IOException, TException, HiveException {
@@ -483,28 +540,31 @@ public class PreUpgradeTool {
       if (needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo, txns)) {
         compactionCommands.add(getCompactionCommand(t, p));
         if (execute) {
-          scheduleCompaction(t, p, db, compactionMetaInfo);
+          scheduleCompaction(t, p, hms, compactionMetaInfo);
         }
       }
     }
   }
-  private static void scheduleCompaction(Table t, Partition p, Hive db,
+  private static void scheduleCompaction(Table t, Partition p, IMetaStoreClient db,
       CompactionMetaInfo compactionMetaInfo) throws HiveException, MetaException {
     String partName = p == null ? null :
         Warehouse.makePartName(t.getPartitionKeys(), p.getValues());
-    CompactionResponse resp =
-        //this gives an easy way to get at compaction ID so we can only wait for those this
-        //utility started
-        db.compact2(t.getDbName(), t.getTableName(), partName, "major", null);
-    if(!resp.isAccepted()) {
-      LOG.info(Warehouse.getQualifiedName(t) + (p == null ? "" : "/" + partName) +
-          " is already being compacted with id=" + resp.getId());
-    }
-    else {
-      LOG.info("Scheduled compaction for " + Warehouse.getQualifiedName(t) +
-          (p == null ? "" : "/" + partName) + " with id=" + resp.getId());
-    }
-    compactionMetaInfo.compactionIds.add(resp.getId());
+    try {
+      CompactionResponse resp =
+              //this gives an easy way to get at compaction ID so we can only wait for those this
+              //utility started
+              db.compact2(t.getDbName(), t.getTableName(), partName, CompactionType.MAJOR, null);
+      if (!resp.isAccepted()) {
+        LOG.info(Warehouse.getQualifiedName(t) + (p == null ? "" : "/" + partName) +
+                " is already being compacted with id=" + resp.getId());
+      } else {
+        LOG.info("Scheduled compaction for " + Warehouse.getQualifiedName(t) +
+                (p == null ? "" : "/" + partName) + " with id=" + resp.getId());
+      }
+      compactionMetaInfo.addCompactionId(resp.getId());
+    } catch (TException e) {
+      throw new HiveException(e);
+    }
   }
 
   /**
@@ -547,14 +607,14 @@ public class PreUpgradeTool {
         }
         if(needsCompaction(bucket, fs)) {
           //found delete events - this 'location' needs compacting
-          compactionMetaInfo.numberOfBytes += getDataSize(location, conf);
+          compactionMetaInfo.addBytes(getDataSize(location, conf));
 
           //if there are un-compacted original files, they will be included in compaction, so
           //count at the size for 'cost' estimation later
           for(HadoopShims.HdfsFileStatusWithId origFile : dir.getOriginalFiles()) {
             FileStatus fileStatus = origFile.getFileStatus();
             if(fileStatus != null) {
-              compactionMetaInfo.numberOfBytes += fileStatus.getLen();
+              compactionMetaInfo.addBytes(fileStatus.getLen());
             }
           }
           return true;
@@ -664,15 +724,9 @@ public class PreUpgradeTool {
     return txnMgr.equals(dbTxnMgr) && concurrency;
   }
 
-  private static class CompactionMetaInfo {
-    /**
-     * total number of bytes to be compacted across all compaction commands
-     */
-    long numberOfBytes;
-    /**
-     * IDs of compactions launched by this utility
-     */
-    Set<Long> compactionIds = new HashSet<>();
+  @Override
+  public void close() {
+    metaStoreClient.close();
   }
 
   @VisibleForTesting
diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/RunOptions.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/RunOptions.java
index 66213d4..534b971 100644
--- a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/RunOptions.java
+++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/RunOptions.java
@@ -27,13 +27,33 @@ public class RunOptions {
 
   public static RunOptions fromCommandLine(CommandLine commandLine) {
     String tableTypeText = commandLine.getOptionValue("tableType");
+
+    int defaultPoolSize = Runtime.getRuntime().availableProcessors();
+    if (defaultPoolSize < 1)
+      defaultPoolSize = 1;
+
+    int tablePoolSize = getIntOptionValue(commandLine, "tablePoolSize", defaultPoolSize);
+    if (tablePoolSize < 1)
+      throw new IllegalArgumentException("Please specify a positive integer option value for tablePoolSize");
+
     return new RunOptions(
       commandLine.getOptionValue("location", "."),
       commandLine.hasOption("execute"),
       commandLine.getOptionValue("dbRegex", ".*"),
       commandLine.getOptionValue("tableRegex", ".*"),
-      tableTypeText == null ? null : TableType.valueOf(tableTypeText)
-    );
+      tableTypeText == null ? null : TableType.valueOf(tableTypeText),
+      tablePoolSize);
+  }
+
+  private static int getIntOptionValue(CommandLine commandLine, String optionName, int defaultValue) {
+    if (commandLine.hasOption(optionName)) {
+      try {
+        return Integer.parseInt(commandLine.getOptionValue(optionName));
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("Please specify a positive integer option value for " + optionName, e);
+      }
+    }
+    return defaultValue;
   }
 
   private final String outputDir;
@@ -41,13 +61,15 @@ public class RunOptions {
   private final String dbRegex;
   private final String tableRegex;
   private final TableType tableType;
+  private final int tablePoolSize;
 
-  public RunOptions(String outputDir, boolean execute, String dbRegex, String tableRegex, TableType tableType) {
+  private RunOptions(String outputDir, boolean execute, String dbRegex, String tableRegex, TableType tableType, int tablePoolSize) {
     this.outputDir = outputDir;
     this.execute = execute;
     this.dbRegex = dbRegex;
     this.tableRegex = tableRegex;
     this.tableType = tableType;
+    this.tablePoolSize = tablePoolSize;
   }
 
   public String getOutputDir() {
@@ -70,6 +92,10 @@ public class RunOptions {
     return tableType;
   }
 
+  public int getTablePoolSize() {
+    return tablePoolSize;
+  }
+
   @Override
   public String toString() {
     return "RunOptions{" +
@@ -78,6 +104,7 @@ public class RunOptions {
             ", dbRegex='" + dbRegex + '\'' +
             ", tableRegex='" + tableRegex + '\'' +
             ", tableType=" + tableType +
+            ", tablePoolSize=" + tablePoolSize +
             '}';
   }
 }
diff --git a/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestCloseableThreadLocal.java b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestCloseableThreadLocal.java
new file mode 100644
index 0000000..2584a3b
--- /dev/null
+++ b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestCloseableThreadLocal.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+public class TestCloseableThreadLocal {
+
+  private static class AutoCloseableStub implements AutoCloseable {
+
+    private boolean closed = false;
+
+    public boolean isClosed() {
+      return closed;
+    }
+
+    @Override
+    public void close() {
+      closed = true;
+    }
+  }
+
+  @Test
+  public void testResourcesAreInitiallyNotClosed() {
+    CloseableThreadLocal<AutoCloseableStub> closeableThreadLocal =
+            new CloseableThreadLocal<>(AutoCloseableStub::new, AutoCloseableStub::close, 1);
+
+    assertThat(closeableThreadLocal.get().isClosed(), is(false));
+  }
+
+  @Test
+  public void testAfterCallingCloseAllInstancesAreClosed() throws ExecutionException, InterruptedException {
+    CloseableThreadLocal<AutoCloseableStub> closeableThreadLocal =
+            new CloseableThreadLocal<>(AutoCloseableStub::new, AutoCloseableStub::close, 2);
+
+    AutoCloseableStub asyncInstance = CompletableFuture.supplyAsync(closeableThreadLocal::get).get();
+    AutoCloseableStub syncInstance = closeableThreadLocal.get();
+
+    closeableThreadLocal.close();
+
+    assertThat(asyncInstance.isClosed(), is(true));
+    assertThat(syncInstance.isClosed(), is(true));
+  }
+
+  @Test
+  public void testSubsequentGetsInTheSameThreadGivesBackTheSameObject() {
+    CloseableThreadLocal<AutoCloseableStub> closeableThreadLocal =
+            new CloseableThreadLocal<>(AutoCloseableStub::new, AutoCloseableStub::close, 2);
+
+    AutoCloseableStub ref1 = closeableThreadLocal.get();
+    AutoCloseableStub ref2 = closeableThreadLocal.get();
+    assertThat(ref1, is(ref2));
+  }
+
+  @Test
+  public void testDifferentThreadsHasDifferentInstancesOfTheResource() throws ExecutionException, InterruptedException {
+    CloseableThreadLocal<AutoCloseableStub> closeableThreadLocal =
+            new CloseableThreadLocal<>(AutoCloseableStub::new, AutoCloseableStub::close, 2);
+
+    AutoCloseableStub asyncInstance = CompletableFuture.supplyAsync(closeableThreadLocal::get).get();
+    AutoCloseableStub syncInstance = closeableThreadLocal.get();
+    assertThat(asyncInstance, is(not(syncInstance)));
+  }
+}
\ No newline at end of file
diff --git a/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestRunOptions.java b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestRunOptions.java
new file mode 100644
index 0000000..8005b5c
--- /dev/null
+++ b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestRunOptions.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import static org.apache.hadoop.hive.upgrade.acid.PreUpgradeTool.createCommandLineOptions;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.commons.cli.GnuParser;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestRunOptions {
+
+  @Rule
+  public ExpectedException expectedEx = ExpectedException.none();
+
+  @Test
+  public void testTablePoolSizeIs5WhenSpecified() throws Exception {
+    String[] args = {"-tablePoolSize", "5"};
+    RunOptions runOptions = RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args));
+    assertThat(runOptions.getTablePoolSize(), is(5));
+  }
+
+  @Test
+  public void testExceptionIsThrownWhenTablePoolSizeIsNotANumber() throws Exception {
+    expectedEx.expect(IllegalArgumentException.class);
+    expectedEx.expectMessage("Please specify a positive integer option value for tablePoolSize");
+
+    String[] args = {"-tablePoolSize", "notANumber"};
+    RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args));
+  }
+
+  @Test
+  public void testExceptionIsThrownWhenTablePoolSizeIsLessThan1() throws Exception {
+    expectedEx.expect(IllegalArgumentException.class);
+    expectedEx.expectMessage("Please specify a positive integer option value for tablePoolSize");
+
+    String[] args = {"-tablePoolSize", "0"};
+    RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args));
+  }
+
+  @Test
+  public void testExceptionIsThrownWhenTablePoolSizeIsNotInteger() throws Exception {
+    expectedEx.expect(IllegalArgumentException.class);
+    expectedEx.expectMessage("Please specify a positive integer option value for tablePoolSize");
+
+    String[] args = {"-tablePoolSize", "0.5"};
+    RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args));
+  }
+}