You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2019/07/12 04:10:57 UTC
[hive] branch master updated: HIVE-21948: Implement parallel
processing in Pre Upgrade Tool (Krisztian Kasa,
reviewed by Jesus Camacho Rodriguez)
This is an automated email from the ASF dual-hosted git repository.
jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 9d9f1a0 HIVE-21948: Implement parallel processing in Pre Upgrade Tool (Krisztian Kasa, reviewed by Jesus Camacho Rodriguez)
9d9f1a0 is described below
commit 9d9f1a087303d4a166ab05bf26c276b5137b5cc9
Author: Krisztian Kasa <kk...@cloudera.com>
AuthorDate: Thu Jul 11 21:09:33 2019 -0700
HIVE-21948: Implement parallel processing in Pre Upgrade Tool (Krisztian Kasa, reviewed by Jesus Camacho Rodriguez)
---
.../hive/upgrade/acid/CloseableThreadLocal.java | 61 ++++
.../hive/upgrade/acid/CompactTablesState.java | 59 ++++
.../hive/upgrade/acid/CompactionMetaInfo.java | 68 +++++
.../acid/NamedForkJoinWorkerThreadFactory.java | 40 +++
.../hadoop/hive/upgrade/acid/PreUpgradeTool.java | 316 ++++++++++++---------
.../hadoop/hive/upgrade/acid/RunOptions.java | 33 ++-
.../upgrade/acid/TestCloseableThreadLocal.java | 86 ++++++
.../hadoop/hive/upgrade/acid/TestRunOptions.java | 67 +++++
8 files changed, 596 insertions(+), 134 deletions(-)
diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocal.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocal.java
new file mode 100644
index 0000000..fbe0a80
--- /dev/null
+++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocal.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class has similar functionality as java.lang.ThreadLocal.
+ * Plus it provides a close function to clean up unmanaged resources in all threads where the resource was initialized.
+ * @param <T> - type of resource
+ */
+public class CloseableThreadLocal<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CloseableThreadLocal.class);
+
+ private final ConcurrentHashMap<Thread, T> threadLocalMap;
+ private final Supplier<T> initialValue;
+ private final Consumer<T> closeFunction;
+
+ public CloseableThreadLocal(Supplier<T> initialValue, Consumer<T> closeFunction, int poolSize) {
+ this.initialValue = initialValue;
+ threadLocalMap = new ConcurrentHashMap<>(poolSize);
+ this.closeFunction = closeFunction;
+ }
+
+ public T get() {
+ return threadLocalMap.computeIfAbsent(Thread.currentThread(), thread -> initialValue.get());
+ }
+
+ public void close() {
+ threadLocalMap.values().forEach(this::closeQuietly);
+ }
+
+ private void closeQuietly(T resource) {
+ try {
+ closeFunction.accept(resource);
+ } catch (Exception e) {
+ LOG.warn("Error while closing resource.", e);
+ }
+ }
+}
diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactTablesState.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactTablesState.java
new file mode 100644
index 0000000..beb934c
--- /dev/null
+++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactTablesState.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import static java.util.Collections.emptyList;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Store result of database and table scan: compaction commands and meta info.
+ */
+public final class CompactTablesState {
+
+ public static CompactTablesState empty() {
+ return new CompactTablesState(emptyList(), new CompactionMetaInfo());
+ }
+
+ public static CompactTablesState compactions(List<String> compactionCommands, CompactionMetaInfo compactionMetaInfo) {
+ return new CompactTablesState(compactionCommands, compactionMetaInfo);
+ }
+
+ private final List<String> compactionCommands;
+ private final CompactionMetaInfo compactionMetaInfo;
+
+ private CompactTablesState(List<String> compactionCommands, CompactionMetaInfo compactionMetaInfo) {
+ this.compactionCommands = compactionCommands;
+ this.compactionMetaInfo = compactionMetaInfo;
+ }
+
+ public List<String> getCompactionCommands() {
+ return compactionCommands;
+ }
+
+ public CompactionMetaInfo getMetaInfo() {
+ return compactionMetaInfo;
+ }
+
+ public CompactTablesState merge(CompactTablesState other) {
+ List<String> compactionCommands = new ArrayList<>(this.compactionCommands);
+ compactionCommands.addAll(other.compactionCommands);
+ return new CompactTablesState(compactionCommands, this.compactionMetaInfo.merge(other.compactionMetaInfo));
+ }
+}
diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactionMetaInfo.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactionMetaInfo.java
new file mode 100644
index 0000000..72b4ec6
--- /dev/null
+++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactionMetaInfo.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Store result of compaction calls.
+ */
+public class CompactionMetaInfo {
+ /**
+ * total number of bytes to be compacted across all compaction commands.
+ */
+ private long numberOfBytes;
+ /**
+ * IDs of compactions launched by this utility.
+ */
+ private final Set<Long> compactionIds;
+
+ public CompactionMetaInfo() {
+ compactionIds = new HashSet<>();
+ numberOfBytes = 0;
+ }
+
+ private CompactionMetaInfo(Set<Long> initialCompactionIds, long initialNumberOfBytes) {
+ this.compactionIds = new HashSet<>(initialCompactionIds);
+ numberOfBytes = initialNumberOfBytes;
+ }
+
+ public CompactionMetaInfo merge(CompactionMetaInfo other) {
+ CompactionMetaInfo result = new CompactionMetaInfo(this.compactionIds, this.numberOfBytes);
+ result.numberOfBytes += other.numberOfBytes;
+ result.compactionIds.addAll(other.compactionIds);
+ return result;
+ }
+
+ public long getNumberOfBytes() {
+ return numberOfBytes;
+ }
+
+ public void addBytes(long bytes) {
+ numberOfBytes += bytes;
+ }
+
+ public Set<Long> getCompactionIds() {
+ return compactionIds;
+ }
+
+ public void addCompactionId(long compactionId) {
+ compactionIds.add(compactionId);
+ }
+}
diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/NamedForkJoinWorkerThreadFactory.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/NamedForkJoinWorkerThreadFactory.java
new file mode 100644
index 0000000..2b95f7b
--- /dev/null
+++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/NamedForkJoinWorkerThreadFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
+
+/**
+ * This class allows specifying a prefix for ForkJoinPool thread names.
+ */
+public class NamedForkJoinWorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
+
+ NamedForkJoinWorkerThreadFactory(String namePrefix) {
+ this.namePrefix = namePrefix;
+ }
+
+ private final String namePrefix;
+
+ @Override
+ public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
+ ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+ worker.setName(namePrefix + worker.getName());
+ return worker;
+ }
+}
diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java
index 0a7354d..5b0ad7c 100644
--- a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java
+++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java
@@ -29,9 +29,8 @@ import java.nio.charset.CharsetDecoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
@@ -57,6 +56,7 @@ import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.CompactionResponse;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
@@ -68,8 +68,8 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.Reader;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.security.AccessControlException;
@@ -116,26 +116,24 @@ import com.google.common.annotations.VisibleForTesting;
*
* See also org.apache.hadoop.hive.ql.util.UpgradeTool in Hive 3.x
*/
-public class PreUpgradeTool {
+public class PreUpgradeTool implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(PreUpgradeTool.class);
private static final int PARTITION_BATCH_SIZE = 10000;
- private final Options cmdLineOptions = new Options();
public static void main(String[] args) throws Exception {
- PreUpgradeTool tool = new PreUpgradeTool();
- tool.init();
+ Options cmdLineOptions = createCommandLineOptions();
CommandLineParser parser = new GnuParser();
CommandLine line ;
try {
- line = parser.parse(tool.cmdLineOptions, args);
+ line = parser.parse(cmdLineOptions, args);
} catch (ParseException e) {
System.err.println("PreUpgradeTool: Parsing failed. Reason: " + e.getLocalizedMessage());
- printAndExit(tool);
+ printAndExit(cmdLineOptions);
return;
}
if (line.hasOption("help")) {
HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("upgrade-acid", tool.cmdLineOptions);
+ formatter.printHelp("upgrade-acid", cmdLineOptions);
return;
}
RunOptions runOptions = RunOptions.fromCommandLine(line);
@@ -144,25 +142,53 @@ public class PreUpgradeTool {
try {
String hiveVer = HiveVersionInfo.getShortVersion();
LOG.info("Using Hive Version: " + HiveVersionInfo.getVersion() + " build: " +
- HiveVersionInfo.getBuildVersion());
+ HiveVersionInfo.getBuildVersion());
if(!hiveVer.startsWith("2.")) {
throw new IllegalStateException("preUpgrade requires Hive 2.x. Actual: " + hiveVer);
}
- tool.prepareAcidUpgradeInternal(runOptions);
+ try (PreUpgradeTool tool = new PreUpgradeTool(runOptions)) {
+ tool.prepareAcidUpgradeInternal();
+ }
}
catch(Exception ex) {
LOG.error("PreUpgradeTool failed", ex);
throw ex;
}
}
- private static void printAndExit(PreUpgradeTool tool) {
+
+ private final HiveConf conf;
+ private final CloseableThreadLocal<IMetaStoreClient> metaStoreClient;
+ private final ThreadLocal<ValidTxnList> txns;
+ private final RunOptions runOptions;
+
+ public PreUpgradeTool(RunOptions runOptions) {
+ this.runOptions = runOptions;
+ this.conf = hiveConf != null ? hiveConf : new HiveConf();
+ this.metaStoreClient = new CloseableThreadLocal<>(this::getHMS, IMetaStoreClient::close,
+ runOptions.getTablePoolSize());
+ this.txns = ThreadLocal.withInitial(() -> {
+ /*
+ This API changed from 2.x to 3.0. so this won't even compile with 3.0
+ but it doesn't need to since we only run this preUpgrade
+ */
+ try {
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ return TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
+ } catch (MetaException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ private static void printAndExit(Options cmdLineOptions) {
HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("upgrade-acid", tool.cmdLineOptions);
+ formatter.printHelp("upgrade-acid", cmdLineOptions);
System.exit(1);
}
- private void init() {
+ static Options createCommandLineOptions() {
try {
+ Options cmdLineOptions = new Options();
cmdLineOptions.addOption(new Option("help", "Generates a script to execute on 2.x" +
" cluster. This requires 2.x binaries on the classpath and hive-site.xml."));
Option exec = new Option("execute",
@@ -196,23 +222,32 @@ public class PreUpgradeTool {
tableTypeOption.setArgs(1);
tableTypeOption.setArgName("table type");
cmdLineOptions.addOption(tableTypeOption);
+
+ Option tablePoolSizeOption = new Option("tn", "Number of threads to process tables.");
+ tablePoolSizeOption.setLongOpt("tablePoolSize");
+ tablePoolSizeOption.setArgs(1);
+ tablePoolSizeOption.setArgName("pool size");
+ cmdLineOptions.addOption(tablePoolSizeOption);
+
+ return cmdLineOptions;
}
catch(Exception ex) {
LOG.error("init()", ex);
throw ex;
}
}
+
private static HiveMetaHookLoader getHookLoader() {
return new HiveMetaHookLoader() {
@Override
public HiveMetaHook getHook(
- org.apache.hadoop.hive.metastore.api.Table tbl) {
+ org.apache.hadoop.hive.metastore.api.Table tbl) {
return null;
}
};
}
- private static IMetaStoreClient getHMS(HiveConf conf) {
+ public IMetaStoreClient getHMS() {
UserGroupInformation loggedInUser = null;
try {
loggedInUser = UserGroupInformation.getLoginUser();
@@ -229,90 +264,51 @@ public class PreUpgradeTool {
which calls HiveMetaStoreClient(HiveConf, Boolean) which exists in
(at least) 2.1.0.2.6.5.0-292 and later but not in 2.1.0.2.6.0.3-8 (the HDP 2.6 release)
i.e. RetryingMetaStoreClient.getProxy(conf, true) is broken in 2.6.0*/
- return RetryingMetaStoreClient.getProxy(conf,
- new Class[]{HiveConf.class, HiveMetaHookLoader.class, Boolean.class},
- new Object[]{conf, getHookLoader(), Boolean.TRUE}, null, HiveMetaStoreClient.class.getName());
- } catch (MetaException e) {
+ IMetaStoreClient client = RetryingMetaStoreClient.getProxy(conf,
+ new Class[]{HiveConf.class, HiveMetaHookLoader.class, Boolean.class},
+ new Object[]{conf, getHookLoader(), Boolean.TRUE}, null, HiveMetaStoreClient.class.getName());
+ if (hiveConf != null) {
+ SessionState ss = SessionState.start(conf);
+ ss.applyAuthorizationPolicy();
+ }
+ return client;
+ } catch (MetaException | HiveException e) {
throw new RuntimeException("Error connecting to Hive Metastore URI: "
- + conf.getVar(HiveConf.ConfVars.METASTOREURIS) + ". " + e.getMessage(), e);
+ + conf.getVar(HiveConf.ConfVars.METASTOREURIS) + ". " + e.getMessage(), e);
}
}
/**
* todo: change script comments to a preamble instead of a footer
*/
- private void prepareAcidUpgradeInternal(RunOptions runOptions)
+ private void prepareAcidUpgradeInternal()
throws HiveException, TException, IOException {
- HiveConf conf = hiveConf != null ? hiveConf : new HiveConf();
- boolean isAcidEnabled = isAcidEnabled(conf);
- IMetaStoreClient hms = getHMS(conf);
+ if (!isAcidEnabled(conf)) {
+ LOG.info("acid is off, there can't be any acid tables - nothing to compact");
+ return;
+ }
+ IMetaStoreClient hms = metaStoreClient.get();
LOG.debug("Looking for databases");
String exceptionMsg = null;
List<String> databases;
- List<String> compactions = new ArrayList<>();
- final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo();
- ValidTxnList txns = null;
- Hive db = null;
+ CompactTablesState compactTablesState;
try {
databases = hms.getDatabases(runOptions.getDbRegex()); //TException
LOG.debug("Found " + databases.size() + " databases to process");
- if (runOptions.isExecute()) {
- db = Hive.get(conf);
- }
- for (String dbName : databases) {
- try {
- List<String> tables;
- if (runOptions.getTableType() == null) {
- tables = hms.getTables(dbName, runOptions.getTableRegex());
- LOG.debug("found {} tables in {}", tables.size(), dbName);
- } else {
- tables = hms.getTables(dbName, runOptions.getTableRegex(), runOptions.getTableType());
- LOG.debug("found {} {} in {}", tables.size(), runOptions.getTableType().name(), dbName);
- }
- for (String tableName : tables) {
- try {
- Table t = hms.getTable(dbName, tableName);
- LOG.debug("processing table " + Warehouse.getQualifiedName(t));
- if (isAcidEnabled) {
- //if acid is off, there can't be any acid tables - nothing to compact
- if (txns == null) {
- /*
- This API changed from 2.x to 3.0. so this won't even compile with 3.0
- but it doesn't need to since we only run this preUpgrade
- */
- TxnStore txnHandler = TxnUtils.getTxnStore(conf);
- txns = TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
- }
- List<String> compactionCommands =
- getCompactionCommands(t, conf, hms, compactionMetaInfo, runOptions.isExecute(), db, txns);
- compactions.addAll(compactionCommands);
- }
- /*todo: handle renaming files somewhere*/
- } catch (Exception e) {
- if (isAccessControlException(e)) {
- // this could be external table with 0 permission for hive user
- exceptionMsg = "Unable to access " + dbName + "." + tableName + ". Pre-upgrade tool requires read-access " +
- "to databases and tables to determine if a table has to be compacted. " +
- "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " +
- "false to allow read-access to databases and tables and retry the pre-upgrade tool again..";
- }
- throw e;
- }
- }
- } catch (Exception e) {
- if (exceptionMsg == null && isAccessControlException(e)) {
- // we may not have access to read all tables from this db
- exceptionMsg = "Unable to access " + dbName + ". Pre-upgrade tool requires read-access " +
- "to databases and tables to determine if a table has to be compacted. " +
- "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " +
- "false to allow read-access to databases and tables and retry the pre-upgrade tool again..";
- }
- throw e;
- }
- }
+ ForkJoinPool processTablePool = new ForkJoinPool(
+ runOptions.getTablePoolSize(),
+ new NamedForkJoinWorkerThreadFactory("Table-"),
+ getUncaughtExceptionHandler(),
+ false
+ );
+ compactTablesState = databases.stream()
+ .map(dbName -> processDatabase(dbName, processTablePool, runOptions))
+ .reduce(CompactTablesState::merge)
+ .orElse(CompactTablesState.empty());
+
} catch (Exception e) {
- if (exceptionMsg == null && isAccessControlException(e)) {
+ if (isAccessControlException(e)) {
exceptionMsg = "Unable to get databases. Pre-upgrade tool requires read-access " +
"to databases and tables to determine if a table has to be compacted. " +
"Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " +
@@ -321,27 +317,27 @@ public class PreUpgradeTool {
throw new HiveException(exceptionMsg, e);
}
- makeCompactionScript(compactions, runOptions.getOutputDir(), compactionMetaInfo);
+ makeCompactionScript(compactTablesState, runOptions.getOutputDir());
if(runOptions.isExecute()) {
- while(compactionMetaInfo.compactionIds.size() > 0) {
- LOG.debug("Will wait for " + compactionMetaInfo.compactionIds.size() +
+ while(compactTablesState.getMetaInfo().getCompactionIds().size() > 0) {
+ LOG.debug("Will wait for " + compactTablesState.getMetaInfo().getCompactionIds().size() +
" compactions to complete");
- ShowCompactResponse resp = db.showCompactions();
+ ShowCompactResponse resp = hms.showCompactions();
for(ShowCompactResponseElement e : resp.getCompacts()) {
final String state = e.getState();
boolean removed;
switch (state) {
case TxnStore.CLEANING_RESPONSE:
case TxnStore.SUCCEEDED_RESPONSE:
- removed = compactionMetaInfo.compactionIds.remove(e.getId());
+ removed = compactTablesState.getMetaInfo().getCompactionIds().remove(e.getId());
if(removed) {
LOG.debug("Required compaction succeeded: " + e.toString());
}
break;
case TxnStore.ATTEMPTED_RESPONSE:
case TxnStore.FAILED_RESPONSE:
- removed = compactionMetaInfo.compactionIds.remove(e.getId());
+ removed = compactTablesState.getMetaInfo().getCompactionIds().remove(e.getId());
if(removed) {
LOG.warn("Required compaction failed: " + e.toString());
}
@@ -357,7 +353,7 @@ public class PreUpgradeTool {
LOG.error("Unexpected state for : " + e.toString());
}
}
- if(compactionMetaInfo.compactionIds.size() > 0) {
+ if(compactTablesState.getMetaInfo().getCompactionIds().size() > 0) {
try {
if (callback != null) {
callback.onWaitForCompaction();
@@ -371,6 +367,67 @@ public class PreUpgradeTool {
}
}
+ private Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
+ return (t, e) -> LOG.error(String.format("Thread %s exited with error", t.getName()), e);
+ }
+
+ private CompactTablesState processDatabase(
+ String dbName, ForkJoinPool threadPool, RunOptions runOptions) {
+ try {
+ IMetaStoreClient hms = metaStoreClient.get();
+
+ List<String> tables;
+ if (runOptions.getTableType() == null) {
+ tables = hms.getTables(dbName, runOptions.getTableRegex());
+ LOG.debug("found {} tables in {}", tables.size(), dbName);
+ }
+ else {
+ tables = hms.getTables(dbName, runOptions.getTableRegex(), runOptions.getTableType());
+ LOG.debug("found {} {} in {}", tables.size(), runOptions.getTableType().name(), dbName);
+ }
+
+ return threadPool.submit(
+ () -> tables.parallelStream()
+ .map(table -> processTable(dbName, table, runOptions))
+ .reduce(CompactTablesState::merge)).get()
+ .orElse(CompactTablesState.empty());
+ } catch (Exception e) {
+ if (isAccessControlException(e)) {
+ // we may not have access to read all tables from this db
+ throw new RuntimeException("Unable to access " + dbName + ". Pre-upgrade tool requires read-access " +
+ "to databases and tables to determine if a table has to be compacted. " +
+ "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " +
+ "false to allow read-access to databases and tables and retry the pre-upgrade tool again..", e);
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ private CompactTablesState processTable(
+ String dbName, String tableName, RunOptions runOptions) {
+ try {
+ IMetaStoreClient hms = metaStoreClient.get();
+ final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo();
+
+ Table t = hms.getTable(dbName, tableName);
+ LOG.debug("processing table " + Warehouse.getQualifiedName(t));
+ List<String> compactionCommands =
+ getCompactionCommands(t, conf, hms, compactionMetaInfo, runOptions.isExecute(), txns.get());
+ return CompactTablesState.compactions(compactionCommands, compactionMetaInfo);
+ /*todo: handle renaming files somewhere*/
+ } catch (Exception e) {
+ if (isAccessControlException(e)) {
+ // this could be external table with 0 permission for hive user
+ throw new RuntimeException(
+ "Unable to access " + dbName + "." + tableName + ". Pre-upgrade tool requires read-access " +
+ "to databases and tables to determine if a table has to be compacted. " +
+ "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " +
+ "false to allow read-access to databases and tables and retry the pre-upgrade tool again..", e);
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
private boolean isAccessControlException(final Exception e) {
// hadoop security AccessControlException
if ((e instanceof MetaException && e.getCause() instanceof AccessControlException) ||
@@ -391,25 +448,25 @@ public class PreUpgradeTool {
/**
* Generates a set compaction commands to run on pre Hive 3 cluster
*/
- private static void makeCompactionScript(List<String> commands, String scriptLocation,
- CompactionMetaInfo compactionMetaInfo) throws IOException {
- if (commands.isEmpty()) {
+ private static void makeCompactionScript(CompactTablesState result, String scriptLocation) throws IOException {
+ if (result.getCompactionCommands().isEmpty()) {
LOG.info("No compaction is necessary");
return;
}
String fileName = "compacts_" + System.currentTimeMillis() + ".sql";
LOG.debug("Writing compaction commands to " + fileName);
- try(PrintWriter pw = createScript(commands, fileName, scriptLocation)) {
+ try(PrintWriter pw = createScript(
+ result.getCompactionCommands(), fileName, scriptLocation)) {
//add post script
- pw.println("-- Generated total of " + commands.size() + " compaction commands");
- if(compactionMetaInfo.numberOfBytes < Math.pow(2, 20)) {
+ pw.println("-- Generated total of " + result.getCompactionCommands().size() + " compaction commands");
+ if(result.getMetaInfo().getNumberOfBytes() < Math.pow(2, 20)) {
//to see it working in UTs
pw.println("-- The total volume of data to be compacted is " +
- String.format("%.6fMB", compactionMetaInfo.numberOfBytes/Math.pow(2, 20)));
+ String.format("%.6fMB", result.getMetaInfo().getNumberOfBytes()/Math.pow(2, 20)));
}
else {
pw.println("-- The total volume of data to be compacted is " +
- String.format("%.3fGB", compactionMetaInfo.numberOfBytes/Math.pow(2, 30)));
+ String.format("%.3fGB", result.getMetaInfo().getNumberOfBytes()/Math.pow(2, 30)));
}
pw.println();
//todo: should be at the top of the file...
@@ -438,7 +495,7 @@ public class PreUpgradeTool {
* @return any compaction commands to run for {@code Table t}
*/
private static List<String> getCompactionCommands(Table t, HiveConf conf,
- IMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo, boolean execute, Hive db,
+ IMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo, boolean execute,
ValidTxnList txns) throws IOException, TException, HiveException {
if(!isFullAcidTable(t)) {
return Collections.emptyList();
@@ -452,7 +509,7 @@ public class PreUpgradeTool {
List<String> cmds = new ArrayList<>();
cmds.add(getCompactionCommand(t, null));
if(execute) {
- scheduleCompaction(t, null, db, compactionMetaInfo);
+ scheduleCompaction(t, null, hms, compactionMetaInfo);
}
return cmds;
}
@@ -463,19 +520,19 @@ public class PreUpgradeTool {
for(int i = 0; i < numWholeBatches; i++) {
List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
partNames.subList(i * batchSize, (i + 1) * batchSize));
- getCompactionCommands(t, partitionList, db, execute, compactionCommands,
+ getCompactionCommands(t, partitionList, hms, execute, compactionCommands,
compactionMetaInfo, conf, txns);
}
if(numWholeBatches * batchSize < partNames.size()) {
//last partial batch
List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
partNames.subList(numWholeBatches * batchSize, partNames.size()));
- getCompactionCommands(t, partitionList, db, execute, compactionCommands,
+ getCompactionCommands(t, partitionList, hms, execute, compactionCommands,
compactionMetaInfo, conf, txns);
}
return compactionCommands;
}
- private static void getCompactionCommands(Table t, List<Partition> partitionList, Hive db,
+ private static void getCompactionCommands(Table t, List<Partition> partitionList, IMetaStoreClient hms,
boolean execute, List<String> compactionCommands, CompactionMetaInfo compactionMetaInfo,
HiveConf conf, ValidTxnList txns)
throws IOException, TException, HiveException {
@@ -483,28 +540,31 @@ public class PreUpgradeTool {
if (needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo, txns)) {
compactionCommands.add(getCompactionCommand(t, p));
if (execute) {
- scheduleCompaction(t, p, db, compactionMetaInfo);
+ scheduleCompaction(t, p, hms, compactionMetaInfo);
}
}
}
}
- private static void scheduleCompaction(Table t, Partition p, Hive db,
+ private static void scheduleCompaction(Table t, Partition p, IMetaStoreClient db,
CompactionMetaInfo compactionMetaInfo) throws HiveException, MetaException {
String partName = p == null ? null :
Warehouse.makePartName(t.getPartitionKeys(), p.getValues());
- CompactionResponse resp =
- //this gives an easy way to get at compaction ID so we can only wait for those this
- //utility started
- db.compact2(t.getDbName(), t.getTableName(), partName, "major", null);
- if(!resp.isAccepted()) {
- LOG.info(Warehouse.getQualifiedName(t) + (p == null ? "" : "/" + partName) +
- " is already being compacted with id=" + resp.getId());
- }
- else {
- LOG.info("Scheduled compaction for " + Warehouse.getQualifiedName(t) +
- (p == null ? "" : "/" + partName) + " with id=" + resp.getId());
- }
- compactionMetaInfo.compactionIds.add(resp.getId());
+ try {
+ CompactionResponse resp =
+ //this gives an easy way to get at compaction ID so we can only wait for those this
+ //utility started
+ db.compact2(t.getDbName(), t.getTableName(), partName, CompactionType.MAJOR, null);
+ if (!resp.isAccepted()) {
+ LOG.info(Warehouse.getQualifiedName(t) + (p == null ? "" : "/" + partName) +
+ " is already being compacted with id=" + resp.getId());
+ } else {
+ LOG.info("Scheduled compaction for " + Warehouse.getQualifiedName(t) +
+ (p == null ? "" : "/" + partName) + " with id=" + resp.getId());
+ }
+ compactionMetaInfo.addCompactionId(resp.getId());
+ } catch (TException e) {
+ throw new HiveException(e);
+ }
}
/**
@@ -547,14 +607,14 @@ public class PreUpgradeTool {
}
if(needsCompaction(bucket, fs)) {
//found delete events - this 'location' needs compacting
- compactionMetaInfo.numberOfBytes += getDataSize(location, conf);
+ compactionMetaInfo.addBytes(getDataSize(location, conf));
//if there are un-compacted original files, they will be included in compaction, so
//count at the size for 'cost' estimation later
for(HadoopShims.HdfsFileStatusWithId origFile : dir.getOriginalFiles()) {
FileStatus fileStatus = origFile.getFileStatus();
if(fileStatus != null) {
- compactionMetaInfo.numberOfBytes += fileStatus.getLen();
+ compactionMetaInfo.addBytes(fileStatus.getLen());
}
}
return true;
@@ -664,15 +724,9 @@ public class PreUpgradeTool {
return txnMgr.equals(dbTxnMgr) && concurrency;
}
- private static class CompactionMetaInfo {
- /**
- * total number of bytes to be compacted across all compaction commands
- */
- long numberOfBytes;
- /**
- * IDs of compactions launched by this utility
- */
- Set<Long> compactionIds = new HashSet<>();
+ @Override
+ public void close() {
+ metaStoreClient.close();
}
@VisibleForTesting
diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/RunOptions.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/RunOptions.java
index 66213d4..534b971 100644
--- a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/RunOptions.java
+++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/RunOptions.java
@@ -27,13 +27,33 @@ public class RunOptions {
public static RunOptions fromCommandLine(CommandLine commandLine) {
String tableTypeText = commandLine.getOptionValue("tableType");
+
+ int defaultPoolSize = Runtime.getRuntime().availableProcessors();
+ if (defaultPoolSize < 1)
+ defaultPoolSize = 1;
+
+ int tablePoolSize = getIntOptionValue(commandLine, "tablePoolSize", defaultPoolSize);
+ if (tablePoolSize < 1)
+ throw new IllegalArgumentException("Please specify a positive integer option value for tablePoolSize");
+
return new RunOptions(
commandLine.getOptionValue("location", "."),
commandLine.hasOption("execute"),
commandLine.getOptionValue("dbRegex", ".*"),
commandLine.getOptionValue("tableRegex", ".*"),
- tableTypeText == null ? null : TableType.valueOf(tableTypeText)
- );
+ tableTypeText == null ? null : TableType.valueOf(tableTypeText),
+ tablePoolSize);
+ }
+
+ private static int getIntOptionValue(CommandLine commandLine, String optionName, int defaultValue) {
+ if (commandLine.hasOption(optionName)) {
+ try {
+ return Integer.parseInt(commandLine.getOptionValue(optionName));
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Please specify a positive integer option value for " + optionName, e);
+ }
+ }
+ return defaultValue;
}
private final String outputDir;
@@ -41,13 +61,15 @@ public class RunOptions {
private final String dbRegex;
private final String tableRegex;
private final TableType tableType;
+ private final int tablePoolSize;
- public RunOptions(String outputDir, boolean execute, String dbRegex, String tableRegex, TableType tableType) {
+ private RunOptions(String outputDir, boolean execute, String dbRegex, String tableRegex, TableType tableType, int tablePoolSize) {
this.outputDir = outputDir;
this.execute = execute;
this.dbRegex = dbRegex;
this.tableRegex = tableRegex;
this.tableType = tableType;
+ this.tablePoolSize = tablePoolSize;
}
public String getOutputDir() {
@@ -70,6 +92,10 @@ public class RunOptions {
return tableType;
}
+ public int getTablePoolSize() {
+ return tablePoolSize;
+ }
+
@Override
public String toString() {
return "RunOptions{" +
@@ -78,6 +104,7 @@ public class RunOptions {
", dbRegex='" + dbRegex + '\'' +
", tableRegex='" + tableRegex + '\'' +
", tableType=" + tableType +
+ ", tablePoolSize=" + tablePoolSize +
'}';
}
}
diff --git a/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestCloseableThreadLocal.java b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestCloseableThreadLocal.java
new file mode 100644
index 0000000..2584a3b
--- /dev/null
+++ b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestCloseableThreadLocal.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+public class TestCloseableThreadLocal {
+
+ private static class AutoCloseableStub implements AutoCloseable {
+
+ private boolean closed = false;
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+ }
+
+ @Test
+ public void testResourcesAreInitiallyNotClosed() {
+ CloseableThreadLocal<AutoCloseableStub> closeableThreadLocal =
+ new CloseableThreadLocal<>(AutoCloseableStub::new, AutoCloseableStub::close, 1);
+
+ assertThat(closeableThreadLocal.get().isClosed(), is(false));
+ }
+
+ @Test
+ public void testAfterCallingCloseAllInstancesAreClosed() throws ExecutionException, InterruptedException {
+ CloseableThreadLocal<AutoCloseableStub> closeableThreadLocal =
+ new CloseableThreadLocal<>(AutoCloseableStub::new, AutoCloseableStub::close, 2);
+
+ AutoCloseableStub asyncInstance = CompletableFuture.supplyAsync(closeableThreadLocal::get).get();
+ AutoCloseableStub syncInstance = closeableThreadLocal.get();
+
+ closeableThreadLocal.close();
+
+ assertThat(asyncInstance.isClosed(), is(true));
+ assertThat(syncInstance.isClosed(), is(true));
+ }
+
+ @Test
+ public void testSubsequentGetsInTheSameThreadGivesBackTheSameObject() {
+ CloseableThreadLocal<AutoCloseableStub> closeableThreadLocal =
+ new CloseableThreadLocal<>(AutoCloseableStub::new, AutoCloseableStub::close, 2);
+
+ AutoCloseableStub ref1 = closeableThreadLocal.get();
+ AutoCloseableStub ref2 = closeableThreadLocal.get();
+ assertThat(ref1, is(ref2));
+ }
+
+ @Test
+ public void testDifferentThreadsHasDifferentInstancesOfTheResource() throws ExecutionException, InterruptedException {
+ CloseableThreadLocal<AutoCloseableStub> closeableThreadLocal =
+ new CloseableThreadLocal<>(AutoCloseableStub::new, AutoCloseableStub::close, 2);
+
+ AutoCloseableStub asyncInstance = CompletableFuture.supplyAsync(closeableThreadLocal::get).get();
+ AutoCloseableStub syncInstance = closeableThreadLocal.get();
+ assertThat(asyncInstance, is(not(syncInstance)));
+ }
+}
\ No newline at end of file
diff --git a/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestRunOptions.java b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestRunOptions.java
new file mode 100644
index 0000000..8005b5c
--- /dev/null
+++ b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestRunOptions.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import static org.apache.hadoop.hive.upgrade.acid.PreUpgradeTool.createCommandLineOptions;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.commons.cli.GnuParser;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestRunOptions {
+
+ @Rule
+ public ExpectedException expectedEx = ExpectedException.none();
+
+ @Test
+ public void testTablePoolSizeIs5WhenSpecified() throws Exception {
+ String[] args = {"-tablePoolSize", "5"};
+ RunOptions runOptions = RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args));
+ assertThat(runOptions.getTablePoolSize(), is(5));
+ }
+
+ @Test
+ public void testExceptionIsThrownWhenTablePoolSizeIsNotANumber() throws Exception {
+ expectedEx.expect(IllegalArgumentException.class);
+ expectedEx.expectMessage("Please specify a positive integer option value for tablePoolSize");
+
+ String[] args = {"-tablePoolSize", "notANumber"};
+ RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args));
+ }
+
+ @Test
+ public void testExceptionIsThrownWhenTablePoolSizeIsLessThan1() throws Exception {
+ expectedEx.expect(IllegalArgumentException.class);
+ expectedEx.expectMessage("Please specify a positive integer option value for tablePoolSize");
+
+ String[] args = {"-tablePoolSize", "0"};
+ RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args));
+ }
+
+ @Test
+ public void testExceptionIsThrownWhenTablePoolSizeIsNotInteger() throws Exception {
+ expectedEx.expect(IllegalArgumentException.class);
+ expectedEx.expectMessage("Please specify a positive integer option value for tablePoolSize");
+
+ String[] args = {"-tablePoolSize", "0.5"};
+ RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args));
+ }
+}