You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by as...@apache.org on 2022/05/09 07:49:54 UTC
[hive] branch master updated: HIVE-26177: Create a new connection pool for compaction (DataNucleus) (Antal Sinkovits, reviewed by Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
asinkovits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 915a17317d HIVE-26177: Create a new connection pool for compaction (DataNucleus) (Antal Sinkovits, reviewed by Denys Kuzmenko)
915a17317d is described below
commit 915a17317d13f7b2eb235b0f9fdd4fb0aaf0beb9
Author: Antal Sinkovits <as...@cloudera.com>
AuthorDate: Mon May 9 09:49:45 2022 +0200
HIVE-26177: Create a new connection pool for compaction (DataNucleus) (Antal Sinkovits, reviewed by Denys Kuzmenko)
Closes #3265
---
.../ql/txn/compactor/MetaStoreCompactorThread.java | 7 +++++++
.../hadoop/hive/metastore/conf/MetastoreConf.java | 3 +++
.../apache/hadoop/hive/metastore/ObjectStore.java | 4 +++-
.../hive/metastore/PersistenceManagerProvider.java | 22 +++++++++++++++++-----
4 files changed, 30 insertions(+), 6 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index 6a451d5138..4f0b324230 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -19,11 +19,13 @@ package org.apache.hadoop.hive.ql.txn.compactor;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hive.metastore.MetaStoreThread;
+import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -39,6 +41,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.COMPACTOR_USE_CUSTOM_POOL;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
/**
@@ -56,6 +59,10 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto
// Get our own instance of the transaction handler
txnHandler = TxnUtils.getTxnStore(conf);
+ // Initialize the RawStore, with the flag marked as true. Since its stored as a ThreadLocal variable in the
+ // HMSHandlerContext, it will use the compactor related pool.
+ MetastoreConf.setBoolVar(conf, COMPACTOR_USE_CUSTOM_POOL, true);
+ getMSForConf(conf);
}
@Override Table resolveTable(CompactionInfo ci) throws MetaException {
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index a2f0e17a75..04f94212a8 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -438,6 +438,9 @@ public class MetastoreConf {
COMPACTOR_RUN_AS_USER("metastore.compactor.run.as.user", "hive.compactor.run.as.user", "",
"Specify the user to run compactor Initiator and Worker as. If empty string, defaults to table/partition " +
"directory owner."),
+ COMPACTOR_USE_CUSTOM_POOL("metastore.compactor.use.custom.pool", "hive.compactor.use.custom.pool",
+ false, "internal usage only -- use custom connection pool specific to compactor components."
+ ),
COMPACTOR_OLDEST_REPLICATION_OPENTXN_THRESHOLD_WARNING(
"metastore.compactor.oldest.replication.open.txn.threshold.warning",
"hive.compactor.oldest.replication.open.txn.threshold.warning",
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 9b7d28bfd0..9b5ef82fdc 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.metastore;
import static org.apache.commons.lang3.StringUtils.join;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.COMPACTOR_USE_CUSTOM_POOL;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
@@ -413,7 +414,8 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("ObjectStore, initialize called");
// if this method fails, PersistenceManagerProvider will retry for the configured number of times
// before giving up
- pm = PersistenceManagerProvider.getPersistenceManager();
+ boolean isForCompactor = MetastoreConf.getBoolVar(conf, COMPACTOR_USE_CUSTOM_POOL);
+ pm = PersistenceManagerProvider.getPersistenceManager(isForCompactor);
LOG.info("RawStore: {}, with PersistenceManager: {}" +
" created in the thread with id: {}", this, pm, Thread.currentThread().getId());
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
index 870532aa91..d5b55b6e55 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
@@ -80,6 +80,7 @@ import java.util.function.Supplier;
*/
public class PersistenceManagerProvider {
private static PersistenceManagerFactory pmf;
+ private static PersistenceManagerFactory compactorPmf;
private static Properties prop;
private static final ReentrantReadWriteLock pmfLock = new ReentrantReadWriteLock();
private static final Lock pmfReadLock = pmfLock.readLock();
@@ -209,7 +210,8 @@ public class PersistenceManagerProvider {
retryInterval = MetastoreConf
.getTimeVar(conf, ConfVars.HMS_HANDLER_INTERVAL, TimeUnit.MILLISECONDS);
// init PMF with retry logic
- retry(() -> {initPMF(conf); return null;});
+ pmf = retry(() -> initPMF(conf, true));
+ compactorPmf = retry(() -> initPMF(conf, false));
}
// downgrade by acquiring read lock before releasing write lock
pmfReadLock.lock();
@@ -225,14 +227,17 @@ public class PersistenceManagerProvider {
}
}
- private static void initPMF(Configuration conf) {
+ private static PersistenceManagerFactory initPMF(Configuration conf, boolean forCompactor) {
DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf);
+ PersistenceManagerFactory pmf;
if (dsp == null) {
pmf = JDOHelper.getPersistenceManagerFactory(prop);
} else {
try {
- DataSource ds = dsp.create(conf);
+ DataSource ds =
+ forCompactor ? dsp.create(conf, MetastoreConf.getIntVar(conf, ConfVars.HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS)) :
+ dsp.create(conf);
Map<Object, Object> dsProperties = new HashMap<>();
//Any preexisting datanucleus property should be passed along
dsProperties.putAll(prop);
@@ -269,6 +274,7 @@ public class PersistenceManagerProvider {
LOG.warn("PersistenceManagerFactory returned null DataStoreCache object. "
+ "Unable to initialize object pin types defined by hive.metastore.cache.pinobjtypes");
}
+ return pmf;
}
/**
@@ -399,13 +405,19 @@ public class PersistenceManagerProvider {
* @return PersistenceManager from the current PersistenceManagerFactory instance
*/
public static PersistenceManager getPersistenceManager() {
+ return getPersistenceManager(false);
+ }
+
+ public static PersistenceManager getPersistenceManager(boolean forCompactor) {
pmfReadLock.lock();
try {
- if (pmf == null) {
+ if ((!forCompactor && pmf == null) || (forCompactor && compactorPmf == null)) {
throw new RuntimeException(
"Cannot create PersistenceManager. PersistenceManagerFactory is not yet initialized");
}
- return retry(pmf::getPersistenceManager);
+ return forCompactor ?
+ retry(compactorPmf::getPersistenceManager) :
+ retry(pmf::getPersistenceManager);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {