You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2014/01/31 04:07:38 UTC
svn commit: r1563038 - in
/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common:
HCatConstants.java HCatUtil.java HiveClientCache.java
Author: thejas
Date: Fri Jan 31 03:07:36 2014
New Revision: 1563038
URL: http://svn.apache.org/r1563038
Log:
HIVE-6268 : Network resource leak with HiveClientCache when using HCatInputFormat (Sushanth Sowmyan via Thejas Nair)
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java
Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java?rev=1563038&r1=1563037&r2=1563038&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java Fri Jan 31 03:07:36 2014
@@ -61,6 +61,10 @@ public final class HCatConstants {
// hcatalog specific configurations, that can be put in hive-site.xml
public static final String HCAT_HIVE_CLIENT_EXPIRY_TIME = "hcatalog.hive.client.cache.expiry.time";
+ // config parameter that suggests to hcat that metastore clients not be cached - default is false
+ // this parameter allows highly-parallel hcat usescases to not gobble up too many connections that
+ // sit in the cache, while not in use.
+ public static final String HCAT_HIVE_CLIENT_DISABLE_CACHE = "hcatalog.hive.client.cache.disabled";
private HCatConstants() { // restrict instantiation
}
Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java?rev=1563038&r1=1563037&r2=1563038&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java Fri Jan 31 03:07:36 2014
@@ -80,7 +80,6 @@ public class HCatUtil {
private static final Logger LOG = LoggerFactory.getLogger(HCatUtil.class);
private static volatile HiveClientCache hiveClientCache;
- private final static int DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS = 2 * 60;
public static boolean checkJobContextIfRunningFromBackend(JobContext j) {
if (j.getConfiguration().get("mapred.task.id", "").equals("") &&
@@ -551,14 +550,16 @@ public class HCatUtil {
public static HiveMetaStoreClient getHiveClient(HiveConf hiveConf)
throws MetaException, IOException {
- // Singleton behaviour: create the cache instance if required. The cache needs to be created lazily and
- // using the expiry time available in hiveConf.
+ if (hiveConf.getBoolean(HCatConstants.HCAT_HIVE_CLIENT_DISABLE_CACHE, false)){
+ // If cache is disabled, don't use it.
+ return HiveClientCache.getNonCachedHiveClient(hiveConf);
+ }
+ // Singleton behaviour: create the cache instance if required.
if (hiveClientCache == null) {
synchronized (HiveMetaStoreClient.class) {
if (hiveClientCache == null) {
- hiveClientCache = new HiveClientCache(hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME,
- DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS));
+ hiveClientCache = new HiveClientCache(hiveConf);
}
}
}
@@ -569,6 +570,10 @@ public class HCatUtil {
}
}
+ private static HiveMetaStoreClient getNonCachedHiveClient(HiveConf hiveConf) throws MetaException{
+ return new HiveMetaStoreClient(hiveConf);
+ }
+
public static void closeHiveClientQuietly(HiveMetaStoreClient client) {
try {
if (client != null)
Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java?rev=1563038&r1=1563037&r2=1563038&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java Fri Jan 31 03:07:36 2014
@@ -18,10 +18,17 @@
*/
package org.apache.hive.hcatalog.common;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.security.auth.login.LoginException;
+
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -33,18 +40,17 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.security.auth.login.LoginException;
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
/**
* A thread safe time expired cache for HiveMetaStoreClient
*/
class HiveClientCache {
+ public final static int DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS = 2 * 60;
+
final private Cache<HiveClientCacheKey, CacheableHiveMetaStoreClient> hiveCache;
private static final Logger LOG = LoggerFactory.getLogger(HiveClientCache.class);
private final int timeout;
@@ -53,6 +59,8 @@ class HiveClientCache {
private static final AtomicInteger nextId = new AtomicInteger(0);
+ private ScheduledFuture<?> cleanupHandle; // used to cleanup cache
+
// Since HiveMetaStoreClient is not threadsafe, hive clients are not shared across threads.
// Thread local variable containing each thread's unique ID, is used as one of the keys for the cache
// causing each thread to get a different client even if the hiveConf is same.
@@ -68,6 +76,14 @@ class HiveClientCache {
return threadId.get();
}
+ public static HiveMetaStoreClient getNonCachedHiveClient(HiveConf hiveConf) throws MetaException {
+ return new HiveMetaStoreClient(hiveConf);
+ }
+
+ public HiveClientCache(HiveConf hiveConf) {
+ this(hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME,
+ DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS));
+ }
/**
* @param timeout the length of time in seconds after a client is created that it should be automatically removed
*/
@@ -90,6 +106,39 @@ class HiveClientCache {
.removalListener(removalListener)
.build();
+ // Add a maintenance thread that will attempt to trigger a cache clean continuously
+ Runnable cleanupThread = new Runnable() {
+ public void run() {
+ hiveCache.cleanUp();
+ }
+ };
+
+ /**
+ * We need to use a cleanup interval, which is how often the cleanup thread will kick in
+ * and go do a check to see if any of the connections can be expired. We don't want to
+ * do this too often, because it'd be like having a mini-GC going off every so often,
+ * so we limit it to a minimum of DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS. If the client
+ * has explicitly set a larger timeout on the cache, though, we respect that, and use that
+ */
+ long cleanupInterval = DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS;
+
+ if (timeout > cleanupInterval){
+ cleanupInterval = timeout;
+ }
+
+ /**
+ * Create the cleanup handle. In addition to cleaning up every cleanupInterval, we add
+ * a slight offset, so that the very first time it runs, it runs with a slight delay, so
+ * as to catch any other connections that were closed when the first timeout happened.
+ * As a result, the time we can expect an unused connection to be reaped is
+ * 5 seconds after the first timeout, and then after that, it'll check for whether or not
+ * it can be cleaned every max(DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS,timeout) seconds
+ */
+ cleanupHandle = Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(
+ cleanupThread,
+ timeout + 5, cleanupInterval, TimeUnit.SECONDS);
+
+
// Add a shutdown hook for cleanup, if there are elements remaining in the cache which were not cleaned up.
// This is the best effort approach. Ignore any error while doing so. Notice that most of the clients
// would get cleaned up via either the removalListener or the close() call, only the active clients
@@ -100,6 +149,7 @@ class HiveClientCache {
@Override
public void run() {
LOG.debug("Cleaning up hive client cache in ShutDown hook");
+ cleanupHandle.cancel(false); // Cancel the maintenance thread.
closeAllClientsQuietly();
}
};