You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ve...@apache.org on 2023/03/03 14:21:18 UTC
[hive] branch master updated: HIVE-27019: Split Cleaner into separate manageable modular entities (Sourabh Badhya, reviewed by Laszlo Vegh, Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
veghlaci05 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 98b3861de9b HIVE-27019: Split Cleaner into separate manageable modular entities (Sourabh Badhya, reviewed by Laszlo Vegh, Denys Kuzmenko)
98b3861de9b is described below
commit 98b3861de9b7426468fe935d0f8f320fcc5122d3
Author: Sourabh Badhya <42...@users.noreply.github.com>
AuthorDate: Fri Mar 3 19:51:03 2023 +0530
HIVE-27019: Split Cleaner into separate manageable modular entities (Sourabh Badhya, reviewed by Laszlo Vegh, Denys Kuzmenko)
---
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 467 +++------------------
.../hive/ql/txn/compactor/CleanupRequest.java | 115 +++++
.../hive/ql/txn/compactor/CompactorUtil.java | 67 +++
.../hadoop/hive/ql/txn/compactor/FSRemover.java | 126 ++++++
.../hadoop/hive/ql/txn/compactor/Initiator.java | 19 +-
.../ql/txn/compactor/MetaStoreCompactorThread.java | 48 +--
.../hive/ql/txn/compactor/MetadataCache.java | 53 +++
.../txn/compactor/handler/CompactionCleaner.java | 389 +++++++++++++++++
.../hive/ql/txn/compactor/handler/TaskHandler.java | 92 ++++
.../txn/compactor/handler/TaskHandlerFactory.java | 49 +++
.../hadoop/hive/ql/txn/compactor/TestCleaner.java | 50 +--
.../hive/ql/txn/compactor/handler/TestHandler.java | 111 +++++
12 files changed, 1083 insertions(+), 503 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 509343041ac..2325c1527d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -17,76 +17,27 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
-import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.LockType;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
-import org.apache.hadoop.hive.metastore.api.TxnOpenException;
-import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
-import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
-import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
-import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.metastore.utils.FileUtils;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.io.AcidDirectory;
-import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
+import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler;
+import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandlerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.Ref;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-import static org.apache.commons.collections4.ListUtils.subtract;
import static org.apache.hadoop.hive.conf.Constants.COMPACTOR_CLEANER_THREAD_NAME_FORMAT;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
-import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
-import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS;
-import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
-import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getIntVar;
-import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
-import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
/**
* A class to clean directories after compactions. This will run in a separate thread.
@@ -97,23 +48,25 @@ public class Cleaner extends MetaStoreCompactorThread {
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
private boolean metricsEnabled = false;
- private ReplChangeManager replChangeManager;
private ExecutorService cleanerExecutor;
+ private List<TaskHandler> cleanupHandlers;
@Override
public void init(AtomicBoolean stop) throws Exception {
super.init(stop);
- replChangeManager = ReplChangeManager.getInstance(conf);
checkInterval = conf.getTimeVar(
HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
+ metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
+ MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM),
COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
- metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
- MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
- boolean tableCacheOn = MetastoreConf.getBoolVar(conf,
- MetastoreConf.ConfVars.COMPACTOR_CLEANER_TABLECACHE_ON);
- initializeCache(tableCacheOn);
+ if (CollectionUtils.isEmpty(cleanupHandlers)) {
+ FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
+ cleanupHandlers = TaskHandlerFactory.getInstance()
+ .getHandlers(conf, txnHandler, metadataCache,
+ metricsEnabled, fsRemover);
+ }
}
@Override
@@ -122,11 +75,8 @@ public class Cleaner extends MetaStoreCompactorThread {
try {
do {
TxnStore.MutexAPI.LockHandle handle = null;
- invalidateMetaCache();
+ metadataCache.invalidate();
long startedAt = -1;
- long retentionTime = HiveConf.getBoolVar(conf, HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
- ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS)
- : 0;
// Make sure nothing escapes this run method and kills the metastore at large,
// so wrap it in a big catch Throwable statement.
@@ -140,48 +90,38 @@ public class Cleaner extends MetaStoreCompactorThread {
HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_DURATION_UPDATE_INTERVAL, TimeUnit.MILLISECONDS),
new CleanerCycleUpdater(MetricsConstants.COMPACTION_CLEANER_CYCLE_DURATION, startedAt));
}
- long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
- checkInterrupt();
- List<CompactionInfo> readyToClean = txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
- checkInterrupt();
-
- if (!readyToClean.isEmpty()) {
- long minTxnIdSeenOpen = Math.min(minOpenTxnId, txnHandler.findMinTxnIdSeenOpen());
-
- List<CompletableFuture<Void>> cleanerList = new ArrayList<>();
- // For checking which compaction can be cleaned we can use the minOpenTxnId
- // However findReadyToClean will return all records that were compacted with old version of HMS
- // where the CQ_NEXT_TXN_ID is not set. For these compactions we need to provide minTxnIdSeenOpen
- // to the clean method, to avoid cleaning up deltas needed for running queries
- // when min_history_level is finally dropped, than every HMS will commit compaction the new way
- // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead.
- for (CompactionInfo ci : readyToClean) {
- //Check for interruption before scheduling each compactionInfo and return if necessary
+ for (TaskHandler cleanupHandler : cleanupHandlers) {
+ try {
checkInterrupt();
- long cleanerWaterMark = (ci.minOpenWriteId > 0) ? ci.nextTxnId + 1 : minTxnIdSeenOpen;
-
- CompletableFuture<Void> asyncJob =
- CompletableFuture.runAsync(
- ThrowingRunnable.unchecked(() -> {
- LOG.info("Cleaning based on min open txn id: " + cleanerWaterMark);
- clean(ci, cleanerWaterMark, metricsEnabled);
- }), cleanerExecutor)
- .exceptionally(t -> {
- LOG.error("Error clearing {}", ci.getFullPartitionName(), t);
- return null;
- });
- cleanerList.add(asyncJob);
+ List<Runnable> tasks = cleanupHandler.getTasks();
+ List<CompletableFuture<Void>> asyncTasks = new ArrayList<>();
+ for (Runnable task : tasks) {
+ CompletableFuture<Void> asyncTask = CompletableFuture.runAsync(
+ task, cleanerExecutor)
+ .exceptionally(t -> {
+ LOG.error("Error clearing due to :", t);
+ return null;
+ });
+ asyncTasks.add(asyncTask);
+ }
+ //Use get instead of join, so we can receive InterruptedException and shutdown gracefully
+ CompletableFuture.allOf(asyncTasks.toArray(new CompletableFuture[0])).get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ } catch (Throwable t) {
+ LOG.error("Caught an exception while executing RequestHandler loop : {} of compactor cleaner, {}",
+ cleanupHandler.getClass().getName(), t.getMessage());
+ throw t;
}
-
- //Use get instead of join, so we can receive InterruptedException and shutdown gracefully
- CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).get();
}
+ checkInterrupt();
} catch (InterruptedException e) {
- // do not ignore interruption requests
+ Thread.currentThread().interrupt();
return;
} catch (Throwable t) {
- LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+ LOG.error("Caught an exception in the main loop of compactor cleaner, {}",
StringUtils.stringifyException(t));
} finally {
if (handle != null) {
@@ -196,7 +136,8 @@ public class Cleaner extends MetaStoreCompactorThread {
doPostLoopActions(System.currentTimeMillis() - startedAt);
} while (!stop.get());
} catch (InterruptedException ie) {
- LOG.error("Compactor cleaner thread interrupted, exiting " +
+ Thread.currentThread().interrupt();
+ LOG.error("Compactor cleaner thread interrupted, exiting {}",
StringUtils.stringifyException(ie));
} finally {
if (Thread.currentThread().isInterrupted()) {
@@ -208,326 +149,15 @@ public class Cleaner extends MetaStoreCompactorThread {
}
}
- private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) throws MetaException {
- LOG.info("Starting cleaning for " + ci);
- PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
- String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
- (ci.type != null ? ci.type.toString().toLowerCase() : null);
- try {
- if (metricsEnabled) {
- perfLogger.perfLogBegin(CLASS_NAME, cleanerMetric);
- }
- final String location = ci.getProperty("location");
-
- Callable<Boolean> cleanUpTask;
- Table t = null;
- Partition p = null;
-
- if (location == null) {
- t = computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci));
- if (t == null) {
- // The table was dropped before we got around to cleaning it.
- LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." +
- idWatermark(ci));
- txnHandler.markCleaned(ci);
- return;
- }
- if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
- // The table was marked no clean up true.
- LOG.info("Skipping table " + ci.getFullTableName() + " clean up, as NO_CLEANUP set to true");
- txnHandler.markCleaned(ci);
- return;
- }
- if (ci.partName != null) {
- p = resolvePartition(ci);
- if (p == null) {
- // The partition was dropped before we got around to cleaning it.
- LOG.info("Unable to find partition " + ci.getFullPartitionName() +
- ", assuming it was dropped." + idWatermark(ci));
- txnHandler.markCleaned(ci);
- return;
- }
- if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
- // The partition was marked no clean up true.
- LOG.info("Skipping partition " + ci.getFullPartitionName() + " clean up, as NO_CLEANUP set to true");
- txnHandler.markCleaned(ci);
- return;
- }
- }
- }
- txnHandler.markCleanerStart(ci);
-
- if (t != null || ci.partName != null) {
- String path = location == null
- ? resolveStorageDescriptor(t, p).getLocation()
- : location;
- boolean dropPartition = ci.partName != null && p == null;
- cleanUpTask = () -> removeFiles(path, minOpenTxn, ci, dropPartition);
- } else {
- cleanUpTask = () -> removeFiles(location, ci);
- }
-
- Ref<Boolean> removedFiles = Ref.from(false);
- if (runJobAsSelf(ci.runAs)) {
- removedFiles.value = cleanUpTask.call();
- } else {
- LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName());
- UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs,
- UserGroupInformation.getLoginUser());
- try {
- ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
- removedFiles.value = cleanUpTask.call();
- return null;
- });
- } finally {
- try {
- FileSystem.closeAllForUGI(ugi);
- } catch (IOException exception) {
- LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " +
- ci.getFullPartitionName() + idWatermark(ci), exception);
- }
- }
- }
- if (removedFiles.value || isDynPartAbort(t, ci)) {
- txnHandler.markCleaned(ci);
- } else {
- txnHandler.clearCleanerStart(ci);
- LOG.warn("No files were removed. Leaving queue entry " + ci + " in ready for cleaning state.");
- }
- } catch (Exception e) {
- LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " +
- StringUtils.stringifyException(e));
- ci.errorMessage = e.getMessage();
- if (metricsEnabled) {
- Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
- }
- handleCleanerAttemptFailure(ci);
- } finally {
- if (metricsEnabled) {
- perfLogger.perfLogEnd(CLASS_NAME, cleanerMetric);
- }
- }
- }
-
- private void handleCleanerAttemptFailure(CompactionInfo ci) throws MetaException {
- long defaultRetention = getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS);
- int cleanAttempts = 0;
- if (ci.retryRetention > 0) {
- cleanAttempts = (int)(Math.log(ci.retryRetention / defaultRetention) / Math.log(2)) + 1;
- }
- if (cleanAttempts >= getIntVar(conf, HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) {
- //Mark it as failed if the max attempt threshold is reached.
- txnHandler.markFailed(ci);
- } else {
- //Calculate retry retention time and update record.
- ci.retryRetention = (long)Math.pow(2, cleanAttempts) * defaultRetention;
- txnHandler.setCleanerRetryRetentionTimeOnError(ci);
- }
- }
-
- private ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, ValidTxnList validTxnList)
- throws NoSuchTxnException, MetaException {
- List<String> tblNames = Collections.singletonList(AcidUtils.getFullTableName(ci.dbname, ci.tableName));
- GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames);
- request.setValidTxnList(validTxnList.writeToString());
- GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(request);
- // we could have no write IDs for a table if it was never written to but
- // since we are in the Cleaner phase of compactions, there must have
- // been some delta/base dirs
- assert rsp != null && rsp.getTblValidWriteIdsSize() == 1;
- ValidReaderWriteIdList validWriteIdList =
- TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0));
- /*
- * We need to filter the obsoletes dir list, to only remove directories that were made obsolete by this compaction
- * If we have a higher retentionTime it is possible for a second compaction to run on the same partition. Cleaning up the first compaction
- * should not touch the newer obsolete directories to not to violate the retentionTime for those.
- */
- if (ci.highestWriteId < validWriteIdList.getHighWatermark()) {
- validWriteIdList = validWriteIdList.updateHighWatermark(ci.highestWriteId);
- }
- return validWriteIdList;
- }
-
- private static boolean isDynPartAbort(Table t, CompactionInfo ci) {
- return Optional.ofNullable(t).map(Table::getPartitionKeys).filter(pk -> pk.size() > 0).isPresent()
- && ci.partName == null;
- }
-
- private static String idWatermark(CompactionInfo ci) {
- return " id=" + ci.id;
- }
-
- private boolean removeFiles(String location, long minOpenTxn, CompactionInfo ci, boolean dropPartition)
- throws Exception {
-
- if (dropPartition) {
- LockRequest lockRequest = createLockRequest(ci, 0, LockType.EXCL_WRITE, DataOperationType.DELETE);
- LockResponse res = null;
-
- try {
- res = txnHandler.lock(lockRequest);
- if (res.getState() == LockState.ACQUIRED) {
- //check if partition wasn't re-created
- if (resolvePartition(ci) == null) {
- return removeFiles(location, ci);
- }
- }
- } catch (NoSuchTxnException | TxnAbortedException e) {
- LOG.error(e.getMessage());
- } finally {
- if (res != null) {
- try {
- txnHandler.unlock(new UnlockRequest(res.getLockid()));
- } catch (NoSuchLockException | TxnOpenException e) {
- LOG.error(e.getMessage());
- }
- }
- }
- }
-
- ValidTxnList validTxnList =
- TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxn);
- //save it so that getAcidState() sees it
- conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
- /**
- * {@code validTxnList} is capped by minOpenTxn so if
- * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta
- * produced by a compactor, that means every reader that could be active right now see it
- * as well. That means if this base/delta shadows some earlier base/delta, the it will be
- * used in favor of any files that it shadows. Thus the shadowed files are safe to delete.
- *
- *
- * The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted
- * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
- * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
- * For example given partition P1, txnid:150 starts and sees txnid:149 as open.
- * Say compactor runs in txnid:160, but 149 is still open and P1 has the largest resolved
- * writeId:17. Compactor will produce base_17_c160.
- * Suppose txnid:149 writes delta_18_18
- * to P1 and aborts. Compactor can only remove TXN_COMPONENTS entries
- * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and perhaps corrupted) but
- * not visible based on 'validTxnList' capped at minOpenTxn so it will not not be cleaned by
- * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so we must keep the
- * metadata that says that 18 is aborted.
- * In a slightly different case, whatever txn created delta_18 (and all other txn) may have
- * committed by the time cleaner runs and so cleaner will indeed see delta_18_18 and remove
- * it (since it has nothing but aborted data). But we can't tell which actually happened
- * in markCleaned() so make sure it doesn't delete meta above CG_CQ_HIGHEST_WRITE_ID.
- *
- * We could perhaps make cleaning of aborted and obsolete and remove all aborted files up
- * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta can be removed
- * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID. This could be
- * useful if there is all of a sudden a flood of aborted txns. (For another day).
- */
-
- // Creating 'reader' list since we are interested in the set of 'obsolete' files
- ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, validTxnList);
- LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
-
- return removeFiles(location, validWriteIdList, ci);
- }
- /**
- * @return true if any files were removed
- */
- private boolean removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci)
- throws Exception {
- Path path = new Path(location);
- FileSystem fs = path.getFileSystem(conf);
-
- // Collect all of the files/dirs
- Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
- AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, writeIdList, Ref.from(false), false,
- dirSnapshots);
- Table table = computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci));
- boolean isDynPartAbort = isDynPartAbort(table, ci);
-
- List<Path> obsoleteDirs = getObsoleteDirs(dir, isDynPartAbort);
- if (isDynPartAbort || dir.hasUncompactedAborts()) {
- ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
- }
- List<Path> deleted = remove(location, ci, obsoleteDirs, true, fs);
- if (dir.getObsolete().size() > 0) {
- AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName, ci.partName, dir.getObsolete(), conf,
- txnHandler);
- }
- // Make sure there are no leftovers below the compacted watermark
- if (ci.minOpenWriteId < 0) {
- conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
- }
- dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
- ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId, Long.MAX_VALUE),
- Ref.from(false), false, dirSnapshots);
-
- List<Path> remained = subtract(getObsoleteDirs(dir, isDynPartAbort), deleted);
- if (!remained.isEmpty()) {
- LOG.warn(idWatermark(ci) + " Remained " + remained.size() +
- " obsolete directories from " + location + ". " + getDebugInfo(remained));
- return false;
- }
- LOG.debug(idWatermark(ci) + " All cleared below the watermark: " + ci.highestWriteId + " from " + location);
- return true;
- }
-
- private List<Path> getObsoleteDirs(AcidDirectory dir, boolean isDynPartAbort) {
- List<Path> obsoleteDirs = dir.getObsolete();
- /**
- * add anything in 'dir' that only has data from aborted transactions - no one should be
- * trying to read anything in that dir (except getAcidState() that only reads the name of
- * this dir itself)
- * So this may run ahead of {@link CompactionInfo#highestWriteId} but it's ok (suppose there
- * are no active txns when cleaner runs). The key is to not delete metadata about aborted
- * txns with write IDs > {@link CompactionInfo#highestWriteId}.
- * See {@link TxnStore#markCleaned(CompactionInfo)}
- */
- obsoleteDirs.addAll(dir.getAbortedDirectories());
- if (isDynPartAbort) {
- // In the event of an aborted DP operation, we should only consider the aborted directories for cleanup.
- // Including obsolete directories for partitioned tables can result in data loss.
- obsoleteDirs = dir.getAbortedDirectories();
- }
- return obsoleteDirs;
- }
-
- private boolean removeFiles(String location, CompactionInfo ci)
- throws IOException, MetaException {
- String strIfPurge = ci.getProperty("ifPurge");
- boolean ifPurge = strIfPurge != null || Boolean.parseBoolean(ci.getProperty("ifPurge"));
-
- Path path = new Path(location);
- return !remove(location, ci, Collections.singletonList(path), ifPurge,
- path.getFileSystem(conf)).isEmpty();
+ @Override
+ public boolean isCacheEnabled() {
+ return MetastoreConf.getBoolVar(conf,
+ MetastoreConf.ConfVars.COMPACTOR_CLEANER_TABLECACHE_ON);
}
- private List<Path> remove(String location, CompactionInfo ci, List<Path> paths, boolean ifPurge, FileSystem fs)
- throws MetaException, IOException {
- List<Path> deleted = new ArrayList<>();
- if (paths.size() < 1) {
- return deleted;
- }
- LOG.info(idWatermark(ci) + " About to remove " + paths.size() +
- " obsolete directories from " + location + ". " + getDebugInfo(paths));
- boolean needCmRecycle;
- try {
- Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), ci.dbname);
- needCmRecycle = ReplChangeManager.isSourceOfReplication(db);
- } catch (NoSuchObjectException ex) {
- // can not drop a database which is a source of replication
- needCmRecycle = false;
- }
- for (Path dead : paths) {
- LOG.debug("Going to delete path " + dead.toString());
- if (needCmRecycle) {
- replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, ifPurge);
- }
- if (FileUtils.moveToTrash(fs, dead, conf, ifPurge)) {
- deleted.add(dead);
- }
- }
- return deleted;
- }
-
- private String getDebugInfo(List<Path> paths) {
- return "[" + paths.stream().map(Path::getName).collect(Collectors.joining(",")) + ']';
+ @VisibleForTesting
+ public void setCleanupHandlers(List<TaskHandler> cleanupHandlers) {
+ this.cleanupHandlers = cleanupHandlers;
}
private static class CleanerCycleUpdater implements Runnable {
@@ -544,5 +174,4 @@ public class Cleaner extends MetaStoreCompactorThread {
updateCycleDurationMetric(metric, startedAt);
}
}
-
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CleanupRequest.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CleanupRequest.java
new file mode 100644
index 00000000000..badfaeaf283
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CleanupRequest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+
+/**
+ * A class which specifies the required information for cleanup.
+ * Objects of this class are created by request handlers.
+ * Objects from this class are passed to FSRemover for cleanup.
+ */
+public class CleanupRequest {
+ private final String location;
+ private final List<Path> obsoleteDirs;
+ private final boolean purge;
+ private final String runAs;
+ private final String dbName;
+ private final String fullPartitionName;
+
+ public CleanupRequest(CleanupRequestBuilder builder) {
+ this.location = builder.location;
+ this.obsoleteDirs = builder.obsoleteDirs;
+ this.purge = builder.purge;
+ this.runAs = builder.runAs;
+ this.dbName = builder.dbName;
+ this.fullPartitionName = builder.fullPartitionName;
+ }
+
+ public String getLocation() {
+ return location;
+ }
+
+ public List<Path> getObsoleteDirs() {
+ return obsoleteDirs;
+ }
+
+ public boolean isPurge() {
+ return purge;
+ }
+
+ public String runAs() {
+ return runAs;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public String getFullPartitionName() {
+ return fullPartitionName;
+ }
+
+ /**
+ * A builder for generating objects of CleaningRequest.
+ */
+ public static class CleanupRequestBuilder {
+ private String location;
+ private List<Path> obsoleteDirs;
+ private boolean purge;
+ private String runAs;
+ private String dbName;
+ private String fullPartitionName;
+
+ public CleanupRequestBuilder setLocation(String location) {
+ this.location = location;
+ return this;
+ }
+
+ public CleanupRequestBuilder setObsoleteDirs(List<Path> obsoleteDirs) {
+ this.obsoleteDirs = obsoleteDirs;
+ return this;
+ }
+
+ public CleanupRequestBuilder setPurge(boolean purge) {
+ this.purge = purge;
+ return this;
+ }
+
+ public CleanupRequestBuilder setDbName(String dbName) {
+ this.dbName = dbName;
+ return this;
+ }
+
+ public CleanupRequestBuilder setRunAs(String runAs) {
+ this.runAs = runAs;
+ return this;
+ }
+
+ public CleanupRequestBuilder setFullPartitionName(String fullPartitionName) {
+ this.fullPartitionName = fullPartitionName;
+ return this;
+ }
+
+ public CleanupRequest build() {
+ return new CleanupRequest(this);
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
index 7111b942563..ddc6a0312aa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
@@ -17,22 +17,36 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.utils.StringableMap;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.function.Function;
+import java.util.stream.Collectors;
import static java.lang.String.format;
+import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
public class CompactorUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(CompactorUtil.class);
public static final String COMPACTOR = "compactor";
/**
* List of accepted properties for defining the compactor's job queue.
@@ -100,4 +114,57 @@ public class CompactorUtil {
return conf.getVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE);
}
+ public static StorageDescriptor resolveStorageDescriptor(Table t, Partition p) {
+ return (p == null) ? t.getSd() : p.getSd();
+ }
+
+ public static boolean isDynPartAbort(Table t, String partName) {
+ return Optional.ofNullable(t).map(Table::getPartitionKeys).filter(pk -> !pk.isEmpty()).isPresent()
+ && partName == null;
+ }
+
+ public static List<Partition> getPartitionsByNames(HiveConf conf, String dbName, String tableName, String partName) throws MetaException {
+ try {
+ return getMSForConf(conf).getPartitionsByNames(getDefaultCatalog(conf), dbName, tableName,
+ Collections.singletonList(partName));
+ } catch (Exception e) {
+ LOG.error("Unable to get partitions by name = {}.{}.{}", dbName, tableName, partName);
+ throw new MetaException(e.toString());
+ }
+ }
+
+ public static String getDebugInfo(List<Path> paths) {
+ return "[" + paths.stream().map(Path::getName).collect(Collectors.joining(",")) + ']';
+ }
+
+ /**
+ * Determine whether to run this job as the current user or whether we need a doAs to switch
+ * users.
+ * @param owner of the directory we will be working in, as determined by
+ * {@link org.apache.hadoop.hive.metastore.txn.TxnUtils#findUserToRunAs(String, Table, Configuration)}
+ * @return true if the job should run as the current user, false if a doAs is needed.
+ */
+ public static boolean runJobAsSelf(String owner) {
+ return (owner.equals(System.getProperty("user.name")));
+ }
+
+ public static List<Path> getObsoleteDirs(AcidDirectory dir, boolean isDynPartAbort) {
+ List<Path> obsoleteDirs = dir.getObsolete();
+ /*
+ * add anything in 'dir' that only has data from aborted transactions - no one should be
+ * trying to read anything in that dir (except getAcidState() that only reads the name of
+ * this dir itself)
+ * So this may run ahead of {@link CompactionInfo#highestWriteId} but it's ok (suppose there
+ * are no active txns when cleaner runs). The key is to not delete metadata about aborted
+ * txns with write IDs > {@link CompactionInfo#highestWriteId}.
+ * See {@link TxnStore#markCleaned(CompactionInfo)}
+ */
+ obsoleteDirs.addAll(dir.getAbortedDirectories());
+ if (isDynPartAbort) {
+ // In the event of an aborted DP operation, we should only consider the aborted directories for cleanup.
+ // Including obsolete directories for partitioned tables can result in data loss.
+ obsoleteDirs = dir.getAbortedDirectories();
+ }
+ return obsoleteDirs;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/FSRemover.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/FSRemover.java
new file mode 100644
index 00000000000..e59a5490206
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/FSRemover.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+
+/**
+ * A runnable class which takes in cleaningRequestHandler and cleaning request and deletes the files
+ * according to the cleaning request.
+ */
+public class FSRemover {
+ private static final Logger LOG = LoggerFactory.getLogger(FSRemover.class);
+ private final HiveConf conf;
+ private final ReplChangeManager replChangeManager;
+ private final MetadataCache metadataCache;
+
+ public FSRemover(HiveConf conf, ReplChangeManager replChangeManager, MetadataCache metadataCache) {
+ this.conf = conf;
+ this.replChangeManager = replChangeManager;
+ this.metadataCache = metadataCache;
+ }
+
+ public List<Path> clean(CleanupRequest cr) throws MetaException {
+ Ref<List<Path>> removedFiles = Ref.from(new ArrayList<>());
+ try {
+ Callable<List<Path>> cleanUpTask;
+ cleanUpTask = () -> removeFiles(cr);
+
+ if (CompactorUtil.runJobAsSelf(cr.runAs())) {
+ removedFiles.value = cleanUpTask.call();
+ } else {
+ LOG.info("Cleaning as user {} for {}", cr.runAs(), cr.getFullPartitionName());
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(cr.runAs(),
+ UserGroupInformation.getLoginUser());
+ try {
+ ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+ removedFiles.value = cleanUpTask.call();
+ return null;
+ });
+ } finally {
+ try {
+ FileSystem.closeAllForUGI(ugi);
+ } catch (IOException exception) {
+ LOG.error("Could not clean up file-system handles for UGI: {} for {}",
+ ugi, cr.getFullPartitionName(), exception);
+ }
+ }
+ }
+ } catch (Exception ex) {
+ LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", cr,
+ StringUtils.stringifyException(ex));
+ }
+ return removedFiles.value;
+ }
+
+ /**
+ * @param cr Cleaning request
+ * @return List of deleted files if any files were removed
+ */
+ private List<Path> removeFiles(CleanupRequest cr)
+ throws MetaException, IOException {
+ List<Path> deleted = new ArrayList<>();
+ if (cr.getObsoleteDirs().isEmpty()) {
+ return deleted;
+ }
+ LOG.info("About to remove {} obsolete directories from {}. {}", cr.getObsoleteDirs().size(),
+ cr.getLocation(), CompactorUtil.getDebugInfo(cr.getObsoleteDirs()));
+ boolean needCmRecycle;
+ try {
+ Database db = metadataCache.computeIfAbsent(cr.getDbName(), () -> getMSForConf(conf).getDatabase(getDefaultCatalog(conf), cr.getDbName()));
+ needCmRecycle = ReplChangeManager.isSourceOfReplication(db);
+ } catch (NoSuchObjectException ex) {
+ // can not drop a database which is a source of replication
+ needCmRecycle = false;
+ } catch (Exception ex) {
+ throw new MetaException(ex.getMessage());
+ }
+ FileSystem fs = new Path(cr.getLocation()).getFileSystem(conf);
+ for (Path dead : cr.getObsoleteDirs()) {
+ LOG.debug("Going to delete path: {}", dead);
+ if (needCmRecycle) {
+ replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, cr.isPurge());
+ }
+ if (FileUtils.moveToTrash(fs, dead, conf, cr.isPurge())) {
+ deleted.add(dead);
+ }
+ }
+ return deleted;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index ee7727b69ba..284a6ffca15 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -149,7 +149,7 @@ public class Initiator extends MetaStoreCompactorThread {
// Currently we invalidate all entries after each cycle, because the bootstrap replication is marked via
// table property hive.repl.first.inc.pending which would be cached.
- invalidateMetaCache();
+ metadataCache.invalidate();
Set<String> skipDBs = Sets.newConcurrentHashSet();
Set<String> skipTables = Sets.newConcurrentHashSet();
@@ -179,7 +179,7 @@ public class Initiator extends MetaStoreCompactorThread {
return;
}
- Table t = computeIfAbsent(ci.getFullTableName(),() -> resolveTable(ci));
+ Table t = metadataCache.computeIfAbsent(ci.getFullTableName(),() -> resolveTable(ci));
String poolName = getPoolName(ci, t);
Partition p = resolvePartition(ci);
if (p == null && ci.partName != null) {
@@ -246,6 +246,12 @@ public class Initiator extends MetaStoreCompactorThread {
}
}
+ @Override
+ public boolean isCacheEnabled() {
+ return MetastoreConf.getBoolVar(conf,
+ MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON);
+ }
+
private void scheduleCompactionIfRequired(CompactionInfo ci, Table t, Partition p, String poolName,
String runAs, boolean metricsEnabled)
throws MetaException {
@@ -280,7 +286,7 @@ public class Initiator extends MetaStoreCompactorThread {
Map<String, String> params = t.getParameters();
String poolName = params == null ? null : params.get(Constants.HIVE_COMPACTOR_WORKER_POOL);
if (StringUtils.isBlank(poolName)) {
- params = computeIfAbsent(ci.dbname, () -> resolveDatabase(ci)).getParameters();
+ params = metadataCache.computeIfAbsent(ci.dbname, () -> resolveDatabase(ci)).getParameters();
poolName = params == null ? null : params.get(Constants.HIVE_COMPACTOR_WORKER_POOL);
}
return poolName;
@@ -329,9 +335,6 @@ public class Initiator extends MetaStoreCompactorThread {
compactionExecutor = CompactorUtil.createExecutorWithThreadFactory(
conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE),
COMPACTOR_INTIATOR_THREAD_NAME_FORMAT);
- boolean tableCacheOn = MetastoreConf.getBoolVar(conf,
- MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON);
- initializeCache(tableCacheOn);
metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
}
@@ -593,7 +596,7 @@ public class Initiator extends MetaStoreCompactorThread {
return false;
}
- Table t = computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci));
+ Table t = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci));
if (t == null) {
LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " +
"table or has been dropped and moving on.");
@@ -605,7 +608,7 @@ public class Initiator extends MetaStoreCompactorThread {
return false;
}
- Map<String, String> dbParams = computeIfAbsent(ci.dbname, () -> resolveDatabase(ci)).getParameters();
+ Map<String, String> dbParams = metadataCache.computeIfAbsent(ci.dbname, () -> resolveDatabase(ci)).getParameters();
if (MetaStoreUtils.isNoAutoCompactSet(dbParams, t.getParameters())) {
if (Boolean.parseBoolean(MetaStoreUtils.getNoAutoCompact(dbParams))) {
skipDBs.add(ci.dbname);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index 1db2e539dcb..d7871b47054 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hive.metastore.MetaStoreThread;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -32,17 +30,12 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.thrift.TBase;
import org.apache.thrift.TException;
-import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
@@ -53,12 +46,11 @@ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCa
* Compactor threads that runs in the metastore. It uses a {@link TxnStore}
* to access the internal database.
*/
-public class MetaStoreCompactorThread extends CompactorThread implements MetaStoreThread {
+public abstract class MetaStoreCompactorThread extends CompactorThread implements MetaStoreThread {
protected TxnStore txnHandler;
protected ScheduledExecutorService cycleUpdaterExecutorService;
-
- private Optional<Cache<String, TBase>> metaCache = Optional.empty();
+ protected MetadataCache metadataCache;
@Override
public void init(AtomicBoolean stop) throws Exception {
@@ -66,6 +58,7 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto
// Get our own instance of the transaction handler
txnHandler = TxnUtils.getTxnStore(conf);
+ metadataCache = new MetadataCache(isCacheEnabled());
// Initialize the RawStore, with the flag marked as true. Since its stored as a ThreadLocal variable in the
// HMSHandlerContext, it will use the compactor related pool.
MetastoreConf.setBoolVar(conf, COMPACTOR_USE_CUSTOM_POOL, true);
@@ -97,18 +90,11 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto
}
@Override List<Partition> getPartitionsByNames(CompactionInfo ci) throws MetaException {
- try {
- return getMSForConf(conf).getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName,
- Collections.singletonList(ci.partName));
- } catch (MetaException e) {
- LOG.error("Unable to get partitions by name for CompactionInfo=" + ci);
- throw e;
- } catch (NoSuchObjectException e) {
- LOG.error("Unable to get partitions by name for CompactionInfo=" + ci);
- throw new MetaException(e.toString());
- }
+ return CompactorUtil.getPartitionsByNames(conf, ci.dbname, ci.tableName, ci.partName);
}
+ public abstract boolean isCacheEnabled();
+
protected void startCycleUpdater(long updateInterval, Runnable taskToRun) {
if (cycleUpdaterExecutorService == null) {
if (updateInterval > 0) {
@@ -142,26 +128,4 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto
return 0;
}
- <T extends TBase<T,?>> T computeIfAbsent(String key, Callable<T> callable) throws Exception {
- if (metaCache.isPresent()) {
- try {
- return (T) metaCache.get().get(key, callable);
- } catch (ExecutionException e) {
- throw (Exception) e.getCause();
- }
- }
- return callable.call();
- }
-
- Optional<Cache<String, TBase>> initializeCache(boolean tableCacheOn) {
- if (tableCacheOn) {
- metaCache = Optional.of(CacheBuilder.newBuilder().softValues().build());
- }
- return metaCache;
- }
-
- void invalidateMetaCache(){
- metaCache.ifPresent(Cache::invalidateAll);
- }
-
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java
new file mode 100644
index 00000000000..a09491b4e2f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.thrift.TBase;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+public class MetadataCache {
+
+ private Cache<String, TBase> metaCache;
+
+ public MetadataCache(boolean tableCacheOn) {
+ if (tableCacheOn) {
+ metaCache = CacheBuilder.newBuilder().softValues().build();
+ }
+ }
+
+ public <T extends TBase<T,?>> T computeIfAbsent(String key, Callable<T> callable) throws Exception {
+ if (metaCache != null) {
+ try {
+ return (T) metaCache.get(key, callable);
+ } catch (ExecutionException e) {
+ throw (Exception) e.getCause();
+ }
+ }
+ return callable.call();
+ }
+
+ public void invalidate() {
+ if (metaCache != null) {
+ metaCache.invalidateAll();
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
new file mode 100644
index 00000000000..a352dca4dc5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest;
+import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest.CleanupRequestBuilder;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.collections4.ListUtils.subtract;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getIntVar;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
+import static java.util.Objects.isNull;
+
+/**
+ * A compaction based implementation of RequestHandler.
+ * Provides implementation of creation of compaction clean tasks.
+ */
+class CompactionCleaner extends TaskHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompactionCleaner.class.getName());
+
+ public CompactionCleaner(HiveConf conf, TxnStore txnHandler,
+ MetadataCache metadataCache, boolean metricsEnabled,
+ FSRemover fsRemover) {
+ super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
+ }
+
+ @Override
+ public List<Runnable> getTasks() throws MetaException {
+ long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+ long retentionTime = HiveConf.getBoolVar(conf, HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
+ ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS)
+ : 0;
+ List<CompactionInfo> readyToClean = txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
+ if (!readyToClean.isEmpty()) {
+ long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen();
+ // For checking which compaction can be cleaned we can use the minOpenTxnId
+ // However findReadyToClean will return all records that were compacted with old version of HMS
+ // where the CQ_NEXT_TXN_ID is not set. For these compactions we need to provide minTxnIdSeenOpen
+ // to the clean method, to avoid cleaning up deltas needed for running queries
+ // when min_history_level is finally dropped, than every HMS will commit compaction the new way
+ // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead.
+ return readyToClean.stream().map(ci -> {
+ long cleanerWaterMark = (ci.minOpenWriteId > 0) ? ci.nextTxnId + 1 : minTxnIdSeenOpen;
+ LOG.info("Cleaning based on min open txn id: {}", cleanerWaterMark);
+ return ThrowingRunnable.unchecked(() -> clean(ci, cleanerWaterMark, metricsEnabled));
+ }).collect(Collectors.toList());
+ }
+ return Collections.emptyList();
+ }
+
+ private void clean(CompactionInfo ci, long minOpenTxnGLB, boolean metricsEnabled) throws MetaException {
+ LOG.info("Starting cleaning for {}", ci);
+ PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
+ String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
+ (!isNull(ci.type) ? ci.type.toString().toLowerCase() : null);
+ try {
+ if (metricsEnabled) {
+ perfLogger.perfLogBegin(CompactionCleaner.class.getName(), cleanerMetric);
+ }
+ final String location = ci.getProperty("location");
+
+ Table t = null;
+ Partition p = null;
+
+ if (isNull(location)) {
+ t = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci.dbname, ci.tableName));
+ if (isNull(t)) {
+ // The table was dropped before we got around to cleaning it.
+ LOG.info("Unable to find table {}, assuming it was dropped. {}", ci.getFullTableName(),
+ idWatermark(ci));
+ txnHandler.markCleaned(ci);
+ return;
+ }
+ if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
+ // The table was marked no clean up true.
+ LOG.info("Skipping table {} clean up, as NO_CLEANUP set to true", ci.getFullTableName());
+ txnHandler.markCleaned(ci);
+ return;
+ }
+ if (!isNull(ci.partName)) {
+ p = resolvePartition(ci.dbname, ci.tableName, ci.partName);
+ if (isNull(p)) {
+ // The partition was dropped before we got around to cleaning it.
+ LOG.info("Unable to find partition {}, assuming it was dropped. {}",
+ ci.getFullPartitionName(), idWatermark(ci));
+ txnHandler.markCleaned(ci);
+ return;
+ }
+ if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
+ // The partition was marked no clean up true.
+ LOG.info("Skipping partition {} clean up, as NO_CLEANUP set to true", ci.getFullPartitionName());
+ txnHandler.markCleaned(ci);
+ return;
+ }
+ }
+ }
+ txnHandler.markCleanerStart(ci);
+
+ if (!isNull(t) || !isNull(ci.partName)) {
+ String path = isNull(location)
+ ? CompactorUtil.resolveStorageDescriptor(t, p).getLocation()
+ : location;
+ boolean dropPartition = !isNull(ci.partName) && isNull(p);
+
+ //check if partition wasn't re-created
+ if (dropPartition && isNull(resolvePartition(ci.dbname, ci.tableName, ci.partName))) {
+ cleanUsingLocation(ci, path, true);
+ } else {
+ cleanUsingAcidDir(ci, path, minOpenTxnGLB);
+ }
+ } else {
+ cleanUsingLocation(ci, location, false);
+ }
+ } catch (Exception e) {
+ LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", ci,
+ e.getMessage());
+ ci.errorMessage = e.getMessage();
+ if (metricsEnabled) {
+ Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
+ }
+ handleCleanerAttemptFailure(ci);
+ } finally {
+ if (metricsEnabled) {
+ perfLogger.perfLogEnd(CompactionCleaner.class.getName(), cleanerMetric);
+ }
+ }
+ }
+
+ private void cleanUsingLocation(CompactionInfo ci, String path, boolean requiresLock) throws MetaException {
+ List<Path> deleted;
+ if (requiresLock) {
+ LockRequest lockRequest = createLockRequest(ci);
+ LockResponse res = null;
+ try {
+ res = txnHandler.lock(lockRequest);
+ deleted = fsRemover.clean(getCleaningRequestBasedOnLocation(ci, path));
+ } catch (NoSuchTxnException | TxnAbortedException e) {
+ LOG.error("Error while trying to acquire exclusive write lock: {}", e.getMessage());
+ throw new MetaException(e.getMessage());
+ } finally {
+ if (res != null) {
+ try {
+ txnHandler.unlock(new UnlockRequest(res.getLockid()));
+ } catch (NoSuchLockException | TxnOpenException e) {
+ LOG.error("Error while trying to release exclusive write lock: {}", e.getMessage());
+ }
+ }
+ }
+ } else {
+ deleted = fsRemover.clean(getCleaningRequestBasedOnLocation(ci, path));
+ }
+ if (!deleted.isEmpty()) {
+ txnHandler.markCleaned(ci);
+ } else {
+ txnHandler.clearCleanerStart(ci);
+ }
+ }
+
+ private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenTxnGLB) throws Exception {
+ ValidTxnList validTxnList =
+ TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB);
+ //save it so that getAcidState() sees it
+ conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+ /*
+ * {@code validTxnList} is capped by minOpenTxnGLB so if
+ * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta
+ * produced by a compactor, that means every reader that could be active right now see it
+ * as well. That means if this base/delta shadows some earlier base/delta, it will be
+ * used in favor of any files that it shadows. Thus, the shadowed files are safe to delete.
+ *
+ *
+ * The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted
+ * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
+ * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
+ * For example given partition P1, txnid:150 starts and sees txnid:149 as open.
+ * Say compactor runs in txnid:160, but 149 is still open and P1 has the largest resolved
+ * writeId:17. Compactor will produce base_17_c160.
+ * Suppose txnid:149 writes delta_18_18
+ * to P1 and aborts. Compactor can only remove TXN_COMPONENTS entries
+ * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and perhaps corrupted) but
+ * not visible based on 'validTxnList' capped at minOpenTxn, so it will not be cleaned by
+ * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so we must keep the
+ * metadata that says that 18 is aborted.
+ * In a slightly different case, whatever txn created delta_18 (and all other txn) may have
+ * committed by the time cleaner runs and so cleaner will indeed see delta_18_18 and remove
+ * it (since it has nothing but aborted data). But we can't tell which actually happened
+ * in markCleaned() so make sure it doesn't delete meta above CG_CQ_HIGHEST_WRITE_ID.
+ *
+ * We could perhaps make cleaning of aborted and obsolete and remove all aborted files up
+ * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta can be removed
+ * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID. This could be
+ * useful if there is all of a sudden a flood of aborted txns. (For another day).
+ */
+
+ // Creating 'reader' list since we are interested in the set of 'obsolete' files
+ ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, validTxnList);
+ LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+
+ Path path = new Path(location);
+ FileSystem fs = path.getFileSystem(conf);
+
+ // Collect all the files/dirs
+ Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
+ AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, validWriteIdList, Ref.from(false), false,
+ dirSnapshots);
+ Table table = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci.dbname, ci.tableName));
+ boolean isDynPartAbort = CompactorUtil.isDynPartAbort(table, ci.partName);
+
+ List<Path> obsoleteDirs = CompactorUtil.getObsoleteDirs(dir, isDynPartAbort);
+ if (isDynPartAbort || dir.hasUncompactedAborts()) {
+ ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
+ }
+
+ List<Path> deleted = fsRemover.clean(new CleanupRequestBuilder().setLocation(location)
+ .setDbName(ci.dbname).setFullPartitionName(ci.getFullPartitionName())
+ .setRunAs(ci.runAs).setObsoleteDirs(obsoleteDirs).setPurge(true)
+ .build());
+
+ if (!deleted.isEmpty()) {
+ AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName, ci.partName, dir.getObsolete(), conf,
+ txnHandler);
+ }
+
+ // Make sure there are no leftovers below the compacted watermark
+ boolean success = false;
+ conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
+ dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
+ ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId, Long.MAX_VALUE),
+ Ref.from(false), false, dirSnapshots);
+
+ List<Path> remained = subtract(CompactorUtil.getObsoleteDirs(dir, isDynPartAbort), deleted);
+ if (!remained.isEmpty()) {
+ LOG.warn("{} Remained {} obsolete directories from {}. {}",
+ idWatermark(ci), remained.size(), location, CompactorUtil.getDebugInfo(remained));
+ } else {
+ LOG.debug("{} All cleared below the watermark: {} from {}", idWatermark(ci), ci.highestWriteId, location);
+ success = true;
+ }
+ if (success || CompactorUtil.isDynPartAbort(table, ci.partName)) {
+ txnHandler.markCleaned(ci);
+ } else {
+ txnHandler.clearCleanerStart(ci);
+ LOG.warn("No files were removed. Leaving queue entry {} in ready for cleaning state.", ci);
+ }
+ }
+
+ protected LockRequest createLockRequest(CompactionInfo ci) {
+ String agentInfo = Thread.currentThread().getName();
+ LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
+ requestBuilder.setUser(ci.runAs);
+ requestBuilder.setTransactionId(0);
+
+ LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+ .setLock(LockType.EXCL_WRITE)
+ .setOperationType(DataOperationType.DELETE)
+ .setDbName(ci.dbname)
+ .setTableName(ci.tableName)
+ .setIsTransactional(true);
+
+ if (ci.partName != null) {
+ lockCompBuilder.setPartitionName(ci.partName);
+ }
+ requestBuilder.addLockComponent(lockCompBuilder.build());
+
+ requestBuilder.setZeroWaitReadEnabled(!conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) ||
+ !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
+ return requestBuilder.build();
+ }
+
+ private static String idWatermark(CompactionInfo ci) {
+ return " id=" + ci.id;
+ }
+
+ private ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, ValidTxnList validTxnList)
+ throws NoSuchTxnException, MetaException {
+ List<String> tblNames = Collections.singletonList(AcidUtils.getFullTableName(ci.dbname, ci.tableName));
+ GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames);
+ request.setValidTxnList(validTxnList.writeToString());
+ GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(request);
+ // we could have no write IDs for a table if it was never written to but
+ // since we are in the Cleaner phase of compactions, there must have
+ // been some delta/base dirs
+ assert rsp != null && rsp.getTblValidWriteIdsSize() == 1;
+ ValidReaderWriteIdList validWriteIdList =
+ TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0));
+ /*
+ * We need to filter the obsoletes dir list, to only remove directories that were made obsolete by this compaction
+ * If we have a higher retentionTime it is possible for a second compaction to run on the same partition. Cleaning up the first compaction
+ * should not touch the newer obsolete directories to not violate the retentionTime for those.
+ */
+ if (ci.highestWriteId < validWriteIdList.getHighWatermark()) {
+ validWriteIdList = validWriteIdList.updateHighWatermark(ci.highestWriteId);
+ }
+ return validWriteIdList;
+ }
+
+ private void handleCleanerAttemptFailure(CompactionInfo ci) throws MetaException {
+ long defaultRetention = getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS);
+ int cleanAttempts = 0;
+ if (ci.retryRetention > 0) {
+ cleanAttempts = (int)(Math.log(ci.retryRetention / defaultRetention) / Math.log(2)) + 1;
+ }
+ if (cleanAttempts >= getIntVar(conf, HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) {
+ //Mark it as failed if the max attempt threshold is reached.
+ txnHandler.markFailed(ci);
+ } else {
+ //Calculate retry retention time and update record.
+ ci.retryRetention = (long)Math.pow(2, cleanAttempts) * defaultRetention;
+ txnHandler.setCleanerRetryRetentionTimeOnError(ci);
+ }
+ }
+
+ private CleanupRequest getCleaningRequestBasedOnLocation(CompactionInfo ci, String location) {
+ String strIfPurge = ci.getProperty("ifPurge");
+ boolean ifPurge = strIfPurge != null || Boolean.parseBoolean(ci.getProperty("ifPurge"));
+
+ Path obsoletePath = new Path(location);
+ return new CleanupRequestBuilder()
+ .setLocation(location).setDbName(ci.dbname).setFullPartitionName(ci.getFullPartitionName())
+ .setRunAs(ci.runAs).setPurge(ifPurge).setObsoleteDirs(Collections.singletonList(obsoletePath))
+ .build();
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
new file mode 100644
index 00000000000..d804a318e54
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+
+/**
+ * An abstract class which defines the list of utility methods for performing cleanup activities.
+ */
+public abstract class TaskHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskHandler.class.getName());
+ protected final TxnStore txnHandler;
+ protected final HiveConf conf;
+ protected final boolean metricsEnabled;
+ protected final MetadataCache metadataCache;
+ protected final FSRemover fsRemover;
+
+ TaskHandler(HiveConf conf, TxnStore txnHandler, MetadataCache metadataCache,
+ boolean metricsEnabled, FSRemover fsRemover) {
+ this.conf = conf;
+ this.txnHandler = txnHandler;
+ this.metadataCache = metadataCache;
+ this.metricsEnabled = metricsEnabled;
+ this.fsRemover = fsRemover;
+ }
+
+ public abstract List<Runnable> getTasks() throws MetaException;
+
+ protected Table resolveTable(String dbName, String tableName) throws MetaException {
+ try {
+ return getMSForConf(conf).getTable(getDefaultCatalog(conf), dbName, tableName);
+ } catch (MetaException e) {
+ LOG.error("Unable to find table {}.{}, {}", dbName, tableName, e.getMessage());
+ throw e;
+ }
+ }
+
+ protected Partition resolvePartition(String dbName, String tableName, String partName) throws MetaException {
+ if (partName != null) {
+ List<Partition> parts;
+ try {
+ parts = CompactorUtil.getPartitionsByNames(conf, dbName, tableName, partName);
+ if (parts == null || parts.isEmpty()) {
+ // The partition got dropped before we went looking for it.
+ return null;
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to find partition: {}.{}.{}", dbName, tableName, partName, e);
+ throw e;
+ }
+ if (parts.size() != 1) {
+ LOG.error("{}.{}.{} does not refer to a single partition. {}", dbName, tableName, partName,
+ Arrays.toString(parts.toArray()));
+ throw new MetaException(String.join("Too many partitions for : ", dbName, tableName, partName));
+ }
+ return parts.get(0);
+ } else {
+ return null;
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java
new file mode 100644
index 00000000000..79293a22c26
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A factory class to fetch handlers.
+ */
+public class TaskHandlerFactory {
+ private static final TaskHandlerFactory INSTANCE = new TaskHandlerFactory();
+
+ public static TaskHandlerFactory getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Factory class, no need to expose constructor.
+ */
+ private TaskHandlerFactory() {
+ }
+
+ public List<TaskHandler> getHandlers(HiveConf conf, TxnStore txnHandler, MetadataCache metadataCache,
+ boolean metricsEnabled, FSRemover fsRemover) {
+ return Arrays.asList(new CompactionCleaner(conf, txnHandler, metadataCache,
+ metricsEnabled, fsRemover));
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 8812c0ca44f..82f052fe362 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
@@ -41,6 +42,8 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler;
+import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandlerFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -58,7 +61,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import org.mockito.Mockito;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
@@ -69,7 +71,6 @@ import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
/**
* Tests for the compactor Cleaner thread
@@ -102,6 +103,10 @@ public class TestCleaner extends CompactorTest {
//Prevent cleaner from marking the compaction as cleaned
TxnStore mockedHandler = spy(txnHandler);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
+ List<TaskHandler> taskHandlers = TaskHandlerFactory.getInstance()
+ .getHandlers(conf, mockedHandler, metadataCache, false, fsRemover);
doThrow(new RuntimeException(errorMessage)).when(mockedHandler).markCleaned(nullable(CompactionInfo.class));
Table t = newTable("default", "retry_test", false);
@@ -125,6 +130,7 @@ public class TestCleaner extends CompactorTest {
for (int i = 1; i < 4; i++) {
Cleaner cleaner = new Cleaner();
cleaner.setConf(conf);
+ cleaner.setCleanupHandlers(taskHandlers);
cleaner.init(new AtomicBoolean(true));
FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
@@ -151,6 +157,7 @@ public class TestCleaner extends CompactorTest {
//Do a final run to reach the maximum retry attempts, so the state finally should be set to failed
Cleaner cleaner = new Cleaner();
cleaner.setConf(conf);
+ cleaner.setCleanupHandlers(taskHandlers);
cleaner.init(new AtomicBoolean(true));
FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
@@ -184,11 +191,16 @@ public class TestCleaner extends CompactorTest {
//Prevent cleaner from marking the compaction as cleaned
TxnStore mockedHandler = spy(txnHandler);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
+ List<TaskHandler> taskHandlers = TaskHandlerFactory.getInstance()
+ .getHandlers(conf, mockedHandler, metadataCache, false, fsRemover);
doThrow(new RuntimeException()).when(mockedHandler).markCleaned(nullable(CompactionInfo.class));
//Do a run to fail the clean and set the retention time
Cleaner cleaner = new Cleaner();
cleaner.setConf(conf);
+ cleaner.setCleanupHandlers(taskHandlers);
cleaner.init(new AtomicBoolean(true));
FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
@@ -204,6 +216,7 @@ public class TestCleaner extends CompactorTest {
//Do a final run and check if the compaction is not picked up again
cleaner = new Cleaner();
cleaner.setConf(conf);
+ cleaner.setCleanupHandlers(taskHandlers);
cleaner.init(new AtomicBoolean(true));
FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
@@ -744,7 +757,7 @@ public class TestCleaner extends CompactorTest {
rqst.setPartitionname(partName);
long compactTxn = compactInTxn(rqst);
addDeltaFile(t, p, 21, 22, 2);
-
+
txnHandler.addWriteIdsToMinHistory(1, Collections.singletonMap("default.trfcp", 23L));
startCleaner();
@@ -1104,37 +1117,6 @@ public class TestCleaner extends CompactorTest {
Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
}
- @Test
- public void testMetaCache() throws Exception {
- conf.setBoolVar(HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED, false);
-
- Table t = newTable("default", "retry_test", false);
-
- addBaseFile(t, null, 20L, 20);
- addDeltaFile(t, null, 21L, 22L, 2);
- addDeltaFile(t, null, 23L, 24L, 2);
- burnThroughTransactions("default", "retry_test", 25);
-
- CompactionRequest rqst = new CompactionRequest("default", "retry_test", CompactionType.MAJOR);
- long compactTxn = compactInTxn(rqst);
- addBaseFile(t, null, 25L, 25, compactTxn);
-
- //Prevent cleaner from marking the compaction as cleaned
- TxnStore mockedHandler = spy(txnHandler);
- doThrow(new RuntimeException()).when(mockedHandler).markCleaned(nullable(CompactionInfo.class));
- Cleaner cleaner = Mockito.spy(new Cleaner());
- cleaner.setConf(conf);
- cleaner.init(new AtomicBoolean(true));
- cleaner.run();
- cleaner.run();
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Mockito.verify(cleaner, times(2)).computeIfAbsent(Mockito.any(),Mockito.any());
- Mockito.verify(cleaner, times(1)).resolveTable(Mockito.any());
- }
-
private void allocateTableWriteId(String dbName, String tblName, long txnId) throws Exception {
AllocateTableWriteIdsRequest awiRqst = new AllocateTableWriteIdsRequest(dbName, tblName);
awiRqst.setTxnIds(Collections.singletonList(txnId));
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
new file mode 100644
index 00000000000..c14da6950c4
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
+import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.apache.hadoop.hive.ql.txn.compactor.TestCleaner;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+
+public class TestHandler extends TestCleaner {
+
+ @Test
+ public void testCompactionHandlerAndFsRemover() throws Exception {
+ Table t = newTable("default", "handler_test", true);
+ Partition p = newPartition(t, "today");
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
+ addBaseFile(t, p, 25L, 25);
+
+ burnThroughTransactions(t.getDbName(), t.getTableName(), 25);
+
+ CompactionRequest rqst = new CompactionRequest(t.getDbName(), t.getTableName(), CompactionType.MAJOR);
+ rqst.setPartitionname("ds=today");
+ compactInTxn(rqst);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler mockedTaskHandler = Mockito.spy(new CompactionCleaner(conf, txnHandler, metadataCache,
+ false, mockedFSRemover));
+ AtomicBoolean stop = new AtomicBoolean(true);
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+ cleaner.init(stop);
+ cleaner.run();
+
+ Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class));
+ Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+ }
+
+ @Test
+ public void testMetaCache() throws Exception {
+ conf.setBoolVar(HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED, false);
+
+ Table t = newTable("default", "retry_test", false);
+
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+ burnThroughTransactions("default", "retry_test", 25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "retry_test", CompactionType.MAJOR);
+ long compactTxn = compactInTxn(rqst);
+ addBaseFile(t, null, 25L, 25, compactTxn);
+
+ //Prevent cleaner from marking the compaction as cleaned
+ MetadataCache mockedMetadataCache = Mockito.spy(new MetadataCache(true));
+ TxnStore mockedTxnHandler = spy(txnHandler);
+ FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), mockedMetadataCache);
+ TaskHandler mockedTaskHandler = Mockito.spy(new CompactionCleaner(conf, mockedTxnHandler, mockedMetadataCache,
+ false, fsRemover));
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.run();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Mockito.verify(mockedMetadataCache, times(3)).computeIfAbsent(any(), any());
+ Mockito.verify(mockedTaskHandler, times(1)).resolveTable(any(), any());
+ }
+}