You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/08/21 07:59:15 UTC

[hive] branch master updated: HIVE-22081: Hivemetastore Performance: Compaction Initiator Thread overwhelmed if there are too many Table/partitions are eligible for compaction (Rajkumar Singh, via Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e76f613  HIVE-22081: Hivemetastore Performance: Compaction Initiator Thread overwhelmed if there are too many Table/partitions are eligible for compaction (Rajkumar Singh, via Peter Vary)
e76f613 is described below

commit e76f613e19037b28f1a82ce9a61adbb4d78693cd
Author: Peter Vary <pv...@cloudera.com>
AuthorDate: Wed Aug 21 09:59:04 2019 +0200

    HIVE-22081: Hivemetastore Performance: Compaction Initiator Thread overwhelmed if there are too many Table/partitions are eligible for compaction (Rajkumar Singh, via Peter Vary)
---
 .../hive/ql/txn/compactor/CompactorThread.java     |  6 +-
 .../hadoop/hive/ql/txn/compactor/Initiator.java    | 91 ++++++++++++++--------
 .../ql/txn/compactor/MetaStoreCompactorThread.java |  6 +-
 3 files changed, 67 insertions(+), 36 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index 94f0031..99da86f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -224,6 +224,10 @@ public abstract class CompactorThread extends Thread implements Configurable {
 
   protected boolean replIsCompactionDisabledForTable(Table tbl) {
     // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail.
-    return ReplUtils.isFirstIncPending(tbl.getParameters());
+    boolean isCompactDisabled = ReplUtils.isFirstIncPending(tbl.getParameters());
+    if (isCompactDisabled) {
+      LOG.info("Compaction is disabled for table " + tbl.getTableName());
+    }
+    return isCompactDisabled;
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 5fb2552..610cf05 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -51,11 +51,13 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 /**
  * A class to initiate compactions.  This will run in a separate thread.
@@ -66,6 +68,8 @@ public class Initiator extends MetaStoreCompactorThread {
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
   static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold.";
+  Map<String, String> tblNameOwnersCache = new HashMap<>();
+
 
   private long checkInterval;
 
@@ -93,7 +97,8 @@ public class Initiator extends MetaStoreCompactorThread {
           startedAt = System.currentTimeMillis();
           //todo: add method to only get current i.e. skip history - more efficient
           ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
-          Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold);
+          Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold)
+                  .stream().filter(ci -> checkCompactionElig(ci)).collect(Collectors.toSet());
           LOG.debug("Found " + potentials.size() + " potential compactions, " +
               "checking to see if we should compact any of them");
           for (CompactionInfo ci : potentials) {
@@ -105,40 +110,8 @@ public class Initiator extends MetaStoreCompactorThread {
             }
             LOG.info("Checking to see if we should compact " + ci.getFullPartitionName());
             try {
-              if (replIsCompactionDisabledForDatabase(ci.dbname)) {
-                // Compaction is disabled for replicated database until after first successful incremental load.
-                LOG.info("Compaction is disabled for database " + ci.dbname);
-                continue;
-              }
 
               Table t = resolveTable(ci);
-              if (t == null) {
-                // Most likely this means it's a temp table
-                LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " +
-                    "table or has been dropped and moving on.");
-                continue;
-              }
-
-              // check if no compaction set for this table
-              if (noAutoCompactSet(t)) {
-                LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT + "=true so we will not compact it.");
-                continue;
-              }
-
-              if (replIsCompactionDisabledForTable(t)) {
-                // Compaction is disabled for replicated table until after first successful incremental load.
-                LOG.info("Compaction is disabled for table " + ci.getFullTableName());
-                continue;
-              }
-
-              // Check to see if this is a table level request on a partitioned table.  If so,
-              // then it's a dynamic partitioning case and we shouldn't check the table itself.
-              if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 &&
-                  ci.partName  == null) {
-                LOG.debug("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic" +
-                    " partitioning");
-                continue;
-              }
 
               // Check if we already have initiated or are working on a compaction for this partition
               // or table.  If so, skip it.  If we are just waiting on cleaning we can still check,
@@ -176,7 +149,13 @@ public class Initiator extends MetaStoreCompactorThread {
                       txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0));
 
               StorageDescriptor sd = resolveStorageDescriptor(t, p);
-              String runAs = findUserToRunAs(sd.getLocation(), t);
+              String runAs = tblNameOwnersCache.get(fullTableName);
+              if (runAs == null) {
+                LOG.debug("unable to find the table owner in the cache for table "+ fullTableName + " " +
+                            "will determine user based on table location");
+                runAs = findUserToRunAs(sd.getLocation(), t);
+                tblNameOwnersCache.put(fullTableName, runAs);
+              }
               /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive.
               * Long term we should consider having a thread pool here and running checkForCompactionS
               * in parallel*/
@@ -220,6 +199,7 @@ public class Initiator extends MetaStoreCompactorThread {
     }
   }
 
+
   @Override
   public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
     super.init(stop, looped);
@@ -398,4 +378,47 @@ public class Initiator extends MetaStoreCompactorThread {
     }
     return noAutoCompact != null && noAutoCompact.equalsIgnoreCase("true");
   }
+
+  // Check to see if this is a table level request on a partitioned table.  If so,
+  // then it's a dynamic partitioning case and we shouldn't check the table itself.
+  private static boolean checkDynPartitioning(Table t, CompactionInfo ci){
+    if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 &&
+            ci.partName  == null) {
+      LOG.debug("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic" +
+              " partitioning");
+      return  true;
+    }
+    return false;
+  }
+
+  private boolean checkCompactionElig(CompactionInfo ci){
+    Table t = null;
+    try {
+      t = resolveTable(ci);
+      if (t == null) {
+        LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " +
+                "table or has been dropped and moving on.");
+        return false;
+      }
+
+      if (replIsCompactionDisabledForDatabase(ci.dbname)) {
+        return false;
+      }
+
+      if (noAutoCompactSet(t)) {
+        LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT +
+                "=true so we will not compact it.");
+        return false;
+      } else if (replIsCompactionDisabledForTable(t)) {
+        return false;
+      } else if (checkDynPartitioning(t, ci)) {
+        return false;
+      }
+
+    } catch (Throwable e) {
+      LOG.error("Caught Exception while checking compactiton eligibility " +
+              StringUtils.stringifyException(e));
+    }
+    return true;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index a6dd4fa..8bfb524 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -78,7 +78,11 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto
     try {
       Database database = rs.getDatabase(getDefaultCatalog(conf), dbName);
       // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail.
-      return ReplUtils.isFirstIncPending(database.getParameters());
+      boolean isReplCompactDisabled = ReplUtils.isFirstIncPending(database.getParameters());
+      if (isReplCompactDisabled) {
+        LOG.info("Compaction is disabled for database " + dbName);
+      }
+      return isReplCompactDisabled;
     } catch (NoSuchObjectException e) {
       LOG.info("Unable to find database " + dbName);
       return true;