You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by as...@apache.org on 2022/05/09 07:49:54 UTC

[hive] branch master updated: HIVE-26177: Create a new connection pool for compaction (DataNucleus) (Antal Sinkovits, reviewed by Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

asinkovits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 915a17317d HIVE-26177: Create a new connection pool for compaction (DataNucleus) (Antal Sinkovits, reviewed by Denys Kuzmenko)
915a17317d is described below

commit 915a17317d13f7b2eb235b0f9fdd4fb0aaf0beb9
Author: Antal Sinkovits <as...@cloudera.com>
AuthorDate: Mon May 9 09:49:45 2022 +0200

    HIVE-26177: Create a new connection pool for compaction (DataNucleus) (Antal Sinkovits, reviewed by Denys Kuzmenko)
    
    Closes #3265
---
 .../ql/txn/compactor/MetaStoreCompactorThread.java |  7 +++++++
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |  3 +++
 .../apache/hadoop/hive/metastore/ObjectStore.java  |  4 +++-
 .../hive/metastore/PersistenceManagerProvider.java | 22 +++++++++++++++++-----
 4 files changed, 30 insertions(+), 6 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index 6a451d5138..4f0b324230 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -19,11 +19,13 @@ package org.apache.hadoop.hive.ql.txn.compactor;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hive.metastore.MetaStoreThread;
+import org.apache.hadoop.hive.metastore.RawStore;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -39,6 +41,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.COMPACTOR_USE_CUSTOM_POOL;
 import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
 
 /**
@@ -56,6 +59,10 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto
 
     // Get our own instance of the transaction handler
     txnHandler = TxnUtils.getTxnStore(conf);
+    // Initialize the RawStore, with the flag marked as true. Since its stored as a ThreadLocal variable in the
+    // HMSHandlerContext, it will use the compactor related pool.
+    MetastoreConf.setBoolVar(conf, COMPACTOR_USE_CUSTOM_POOL, true);
+    getMSForConf(conf);
   }
 
   @Override Table resolveTable(CompactionInfo ci) throws MetaException {
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index a2f0e17a75..04f94212a8 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -438,6 +438,9 @@ public class MetastoreConf {
     COMPACTOR_RUN_AS_USER("metastore.compactor.run.as.user", "hive.compactor.run.as.user", "",
         "Specify the user to run compactor Initiator and Worker as. If empty string, defaults to table/partition " +
         "directory owner."),
+    COMPACTOR_USE_CUSTOM_POOL("metastore.compactor.use.custom.pool", "hive.compactor.use.custom.pool",
+            false, "internal usage only -- use custom connection pool specific to compactor components."
+    ),
     COMPACTOR_OLDEST_REPLICATION_OPENTXN_THRESHOLD_WARNING(
         "metastore.compactor.oldest.replication.open.txn.threshold.warning",
         "hive.compactor.oldest.replication.open.txn.threshold.warning",
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 9b7d28bfd0..9b5ef82fdc 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.metastore;
 
 import static org.apache.commons.lang3.StringUtils.join;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.COMPACTOR_USE_CUSTOM_POOL;
 import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
 import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
 
@@ -413,7 +414,8 @@ public class ObjectStore implements RawStore, Configurable {
     LOG.debug("ObjectStore, initialize called");
     // if this method fails, PersistenceManagerProvider will retry for the configured number of times
     // before giving up
-    pm = PersistenceManagerProvider.getPersistenceManager();
+    boolean isForCompactor = MetastoreConf.getBoolVar(conf, COMPACTOR_USE_CUSTOM_POOL);
+    pm = PersistenceManagerProvider.getPersistenceManager(isForCompactor);
     LOG.info("RawStore: {}, with PersistenceManager: {}" +
         " created in the thread with id: {}", this, pm, Thread.currentThread().getId());
 
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
index 870532aa91..d5b55b6e55 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
@@ -80,6 +80,7 @@ import java.util.function.Supplier;
  */
 public class PersistenceManagerProvider {
   private static PersistenceManagerFactory pmf;
+  private static PersistenceManagerFactory compactorPmf;
   private static Properties prop;
   private static final ReentrantReadWriteLock pmfLock = new ReentrantReadWriteLock();
   private static final Lock pmfReadLock = pmfLock.readLock();
@@ -209,7 +210,8 @@ public class PersistenceManagerProvider {
             retryInterval = MetastoreConf
                 .getTimeVar(conf, ConfVars.HMS_HANDLER_INTERVAL, TimeUnit.MILLISECONDS);
             // init PMF with retry logic
-            retry(() -> {initPMF(conf); return null;});
+            pmf = retry(() -> initPMF(conf, true));
+            compactorPmf = retry(() -> initPMF(conf, false));
           }
           // downgrade by acquiring read lock before releasing write lock
           pmfReadLock.lock();
@@ -225,14 +227,17 @@ public class PersistenceManagerProvider {
     }
   }
 
-  private static void initPMF(Configuration conf) {
+  private static PersistenceManagerFactory initPMF(Configuration conf, boolean forCompactor) {
     DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf);
+    PersistenceManagerFactory pmf;
 
     if (dsp == null) {
       pmf = JDOHelper.getPersistenceManagerFactory(prop);
     } else {
       try {
-        DataSource ds = dsp.create(conf);
+        DataSource ds =
+                forCompactor ? dsp.create(conf, MetastoreConf.getIntVar(conf, ConfVars.HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS)) :
+                        dsp.create(conf);
         Map<Object, Object> dsProperties = new HashMap<>();
         //Any preexisting datanucleus property should be passed along
         dsProperties.putAll(prop);
@@ -269,6 +274,7 @@ public class PersistenceManagerProvider {
       LOG.warn("PersistenceManagerFactory returned null DataStoreCache object. "
           + "Unable to initialize object pin types defined by hive.metastore.cache.pinobjtypes");
     }
+    return pmf;
   }
 
   /**
@@ -399,13 +405,19 @@ public class PersistenceManagerProvider {
    * @return PersistenceManager from the current PersistenceManagerFactory instance
    */
   public static PersistenceManager getPersistenceManager() {
+    return getPersistenceManager(false);
+  }
+
+  public static PersistenceManager getPersistenceManager(boolean forCompactor) {
     pmfReadLock.lock();
     try {
-      if (pmf == null) {
+      if ((!forCompactor && pmf == null) || (forCompactor && compactorPmf == null)) {
         throw new RuntimeException(
             "Cannot create PersistenceManager. PersistenceManagerFactory is not yet initialized");
       }
-      return retry(pmf::getPersistenceManager);
+      return forCompactor ?
+              retry(compactorPmf::getPersistenceManager) :
+              retry(pmf::getPersistenceManager);
     } catch (Exception e) {
       throw new RuntimeException(e);
     } finally {