You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/08/21 07:59:15 UTC
[hive] branch master updated: HIVE-22081: Hivemetastore
Performance: Compaction Initiator Thread overwhelmed if there are too many
Table/partitions are eligible for compaction (Rajkumar Singh,
via Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new e76f613 HIVE-22081: Hivemetastore Performance: Compaction Initiator Thread overwhelmed if there are too many Table/partitions are eligible for compaction (Rajkumar Singh, via Peter Vary)
e76f613 is described below
commit e76f613e19037b28f1a82ce9a61adbb4d78693cd
Author: Peter Vary <pv...@cloudera.com>
AuthorDate: Wed Aug 21 09:59:04 2019 +0200
HIVE-22081: Hivemetastore Performance: Compaction Initiator Thread overwhelmed if there are too many Table/partitions are eligible for compaction (Rajkumar Singh, via Peter Vary)
---
.../hive/ql/txn/compactor/CompactorThread.java | 6 +-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 91 ++++++++++++++--------
.../ql/txn/compactor/MetaStoreCompactorThread.java | 6 +-
3 files changed, 67 insertions(+), 36 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index 94f0031..99da86f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -224,6 +224,10 @@ public abstract class CompactorThread extends Thread implements Configurable {
protected boolean replIsCompactionDisabledForTable(Table tbl) {
// Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail.
- return ReplUtils.isFirstIncPending(tbl.getParameters());
+ boolean isCompactDisabled = ReplUtils.isFirstIncPending(tbl.getParameters());
+ if (isCompactDisabled) {
+ LOG.info("Compaction is disabled for table " + tbl.getTableName());
+ }
+ return isCompactDisabled;
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 5fb2552..610cf05 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -51,11 +51,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
/**
* A class to initiate compactions. This will run in a separate thread.
@@ -66,6 +68,8 @@ public class Initiator extends MetaStoreCompactorThread {
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold.";
+ Map<String, String> tblNameOwnersCache = new HashMap<>();
+
private long checkInterval;
@@ -93,7 +97,8 @@ public class Initiator extends MetaStoreCompactorThread {
startedAt = System.currentTimeMillis();
//todo: add method to only get current i.e. skip history - more efficient
ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
- Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold);
+ Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold)
+ .stream().filter(ci -> checkCompactionElig(ci)).collect(Collectors.toSet());
LOG.debug("Found " + potentials.size() + " potential compactions, " +
"checking to see if we should compact any of them");
for (CompactionInfo ci : potentials) {
@@ -105,40 +110,8 @@ public class Initiator extends MetaStoreCompactorThread {
}
LOG.info("Checking to see if we should compact " + ci.getFullPartitionName());
try {
- if (replIsCompactionDisabledForDatabase(ci.dbname)) {
- // Compaction is disabled for replicated database until after first successful incremental load.
- LOG.info("Compaction is disabled for database " + ci.dbname);
- continue;
- }
Table t = resolveTable(ci);
- if (t == null) {
- // Most likely this means it's a temp table
- LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " +
- "table or has been dropped and moving on.");
- continue;
- }
-
- // check if no compaction set for this table
- if (noAutoCompactSet(t)) {
- LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT + "=true so we will not compact it.");
- continue;
- }
-
- if (replIsCompactionDisabledForTable(t)) {
- // Compaction is disabled for replicated table until after first successful incremental load.
- LOG.info("Compaction is disabled for table " + ci.getFullTableName());
- continue;
- }
-
- // Check to see if this is a table level request on a partitioned table. If so,
- // then it's a dynamic partitioning case and we shouldn't check the table itself.
- if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 &&
- ci.partName == null) {
- LOG.debug("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic" +
- " partitioning");
- continue;
- }
// Check if we already have initiated or are working on a compaction for this partition
// or table. If so, skip it. If we are just waiting on cleaning we can still check,
@@ -176,7 +149,13 @@ public class Initiator extends MetaStoreCompactorThread {
txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0));
StorageDescriptor sd = resolveStorageDescriptor(t, p);
- String runAs = findUserToRunAs(sd.getLocation(), t);
+ String runAs = tblNameOwnersCache.get(fullTableName);
+ if (runAs == null) {
+ LOG.debug("unable to find the table owner in the cache for table "+ fullTableName + " " +
+ "will determine user based on table location");
+ runAs = findUserToRunAs(sd.getLocation(), t);
+ tblNameOwnersCache.put(fullTableName, runAs);
+ }
/*Future thought: checkForCompaction will check a lot of file metadata and may be expensive.
* Long term we should consider having a thread pool here and running checkForCompactionS
* in parallel*/
@@ -220,6 +199,7 @@ public class Initiator extends MetaStoreCompactorThread {
}
}
+
@Override
public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
super.init(stop, looped);
@@ -398,4 +378,47 @@ public class Initiator extends MetaStoreCompactorThread {
}
return noAutoCompact != null && noAutoCompact.equalsIgnoreCase("true");
}
+
+ // Check to see if this is a table level request on a partitioned table. If so,
+ // then it's a dynamic partitioning case and we shouldn't check the table itself.
+ private static boolean checkDynPartitioning(Table t, CompactionInfo ci){
+ if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 &&
+ ci.partName == null) {
+ LOG.debug("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic" +
+ " partitioning");
+ return true;
+ }
+ return false;
+ }
+
+ private boolean checkCompactionElig(CompactionInfo ci){
+ Table t = null;
+ try {
+ t = resolveTable(ci);
+ if (t == null) {
+ LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " +
+ "table or has been dropped and moving on.");
+ return false;
+ }
+
+ if (replIsCompactionDisabledForDatabase(ci.dbname)) {
+ return false;
+ }
+
+ if (noAutoCompactSet(t)) {
+ LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT +
+ "=true so we will not compact it.");
+ return false;
+ } else if (replIsCompactionDisabledForTable(t)) {
+ return false;
+ } else if (checkDynPartitioning(t, ci)) {
+ return false;
+ }
+
+ } catch (Throwable e) {
+ LOG.error("Caught Exception while checking compactiton eligibility " +
+ StringUtils.stringifyException(e));
+ }
+ return true;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index a6dd4fa..8bfb524 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -78,7 +78,11 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto
try {
Database database = rs.getDatabase(getDefaultCatalog(conf), dbName);
// Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail.
- return ReplUtils.isFirstIncPending(database.getParameters());
+ boolean isReplCompactDisabled = ReplUtils.isFirstIncPending(database.getParameters());
+ if (isReplCompactDisabled) {
+ LOG.info("Compaction is disabled for database " + dbName);
+ }
+ return isReplCompactDisabled;
} catch (NoSuchObjectException e) {
LOG.info("Unable to find database " + dbName);
return true;