You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2014/01/31 04:07:38 UTC

svn commit: r1563038 - in /hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common: HCatConstants.java HCatUtil.java HiveClientCache.java

Author: thejas
Date: Fri Jan 31 03:07:36 2014
New Revision: 1563038

URL: http://svn.apache.org/r1563038
Log:
HIVE-6268 : Network resource leak with HiveClientCache when using HCatInputFormat (Sushanth Sowmyan via Thejas Nair)

Modified:
    hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
    hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java
    hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java?rev=1563038&r1=1563037&r2=1563038&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java Fri Jan 31 03:07:36 2014
@@ -61,6 +61,10 @@ public final class HCatConstants {
 
   // hcatalog specific configurations, that can be put in hive-site.xml
   public static final String HCAT_HIVE_CLIENT_EXPIRY_TIME = "hcatalog.hive.client.cache.expiry.time";
+  // config parameter that suggests to hcat that metastore clients not be cached - default is false
+  // this parameter allows highly-parallel hcat usescases to not gobble up too many connections that
+  // sit in the cache, while not in use.
+  public static final String HCAT_HIVE_CLIENT_DISABLE_CACHE = "hcatalog.hive.client.cache.disabled";
 
   private HCatConstants() { // restrict instantiation
   }

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java?rev=1563038&r1=1563037&r2=1563038&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java Fri Jan 31 03:07:36 2014
@@ -80,7 +80,6 @@ public class HCatUtil {
 
   private static final Logger LOG = LoggerFactory.getLogger(HCatUtil.class);
   private static volatile HiveClientCache hiveClientCache;
-  private final static int DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS = 2 * 60;
 
   public static boolean checkJobContextIfRunningFromBackend(JobContext j) {
     if (j.getConfiguration().get("mapred.task.id", "").equals("") &&
@@ -551,14 +550,16 @@ public class HCatUtil {
   public static HiveMetaStoreClient getHiveClient(HiveConf hiveConf)
     throws MetaException, IOException {
 
-    // Singleton behaviour: create the cache instance if required. The cache needs to be created lazily and
-    // using the expiry time available in hiveConf.
+    if (hiveConf.getBoolean(HCatConstants.HCAT_HIVE_CLIENT_DISABLE_CACHE, false)){
+      // If cache is disabled, don't use it.
+      return HiveClientCache.getNonCachedHiveClient(hiveConf);
+    }
 
+    // Singleton behaviour: create the cache instance if required.
     if (hiveClientCache == null) {
       synchronized (HiveMetaStoreClient.class) {
         if (hiveClientCache == null) {
-          hiveClientCache = new HiveClientCache(hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME,
-            DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS));
+          hiveClientCache = new HiveClientCache(hiveConf);
         }
       }
     }
@@ -569,6 +570,10 @@ public class HCatUtil {
     }
   }
 
+  private static HiveMetaStoreClient getNonCachedHiveClient(HiveConf hiveConf) throws MetaException{
+    return new HiveMetaStoreClient(hiveConf);
+  }
+
   public static void closeHiveClientQuietly(HiveMetaStoreClient client) {
     try {
       if (client != null)

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java?rev=1563038&r1=1563037&r2=1563038&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java Fri Jan 31 03:07:36 2014
@@ -18,10 +18,17 @@
  */
 package org.apache.hive.hcatalog.common;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.security.auth.login.LoginException;
+
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -33,18 +40,17 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.auth.login.LoginException;
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 
 /**
  * A thread safe time expired cache for HiveMetaStoreClient
  */
 class HiveClientCache {
+  public final static int DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS = 2 * 60;
+
   final private Cache<HiveClientCacheKey, CacheableHiveMetaStoreClient> hiveCache;
   private static final Logger LOG = LoggerFactory.getLogger(HiveClientCache.class);
   private final int timeout;
@@ -53,6 +59,8 @@ class HiveClientCache {
 
   private static final AtomicInteger nextId = new AtomicInteger(0);
 
+  private ScheduledFuture<?> cleanupHandle; // used to cleanup cache
+
   // Since HiveMetaStoreClient is not threadsafe, hive clients are not  shared across threads.
   // Thread local variable containing each thread's unique ID, is used as one of the keys for the cache
   // causing each thread to get a different client even if the hiveConf is same.
@@ -68,6 +76,14 @@ class HiveClientCache {
     return threadId.get();
   }
 
+  public static HiveMetaStoreClient getNonCachedHiveClient(HiveConf hiveConf) throws MetaException {
+    return new HiveMetaStoreClient(hiveConf);
+  }
+
+  public HiveClientCache(HiveConf hiveConf) {
+    this(hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME,
+        DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS));
+  }
   /**
    * @param timeout the length of time in seconds after a client is created that it should be automatically removed
    */
@@ -90,6 +106,39 @@ class HiveClientCache {
       .removalListener(removalListener)
       .build();
 
+    // Add a maintenance thread that will attempt to trigger a cache clean continuously
+    Runnable cleanupThread = new Runnable() {
+      public void run() {
+        hiveCache.cleanUp();
+      }
+    };
+
+    /**
+     * We need to use a cleanup interval, which is how often the cleanup thread will kick in
+     * and go do a check to see if any of the connections can be expired. We don't want to
+     * do this too often, because it'd be like having a mini-GC going off every so often,
+     * so we limit it to a minimum of DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS. If the client
+     * has explicitly set a larger timeout on the cache, though, we respect that, and use that
+     */
+    long cleanupInterval = DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS;
+
+    if (timeout > cleanupInterval){
+      cleanupInterval = timeout;
+    }
+
+    /**
+     * Create the cleanup handle. In addition to cleaning up every cleanupInterval, we add
+     * a slight offset, so that the very first time it runs, it runs with a slight delay, so
+     * as to catch any other connections that were closed when the first timeout happened.
+     * As a result, the time we can expect an unused connection to be reaped is
+     * 5 seconds after the first timeout, and then after that, it'll check for whether or not
+     * it can be cleaned every max(DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS,timeout) seconds
+     */
+    cleanupHandle = Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(
+        cleanupThread,
+        timeout + 5, cleanupInterval, TimeUnit.SECONDS);
+
+
     // Add a shutdown hook for cleanup, if there are elements remaining in the cache which were not cleaned up.
     // This is the best effort approach. Ignore any error while doing so. Notice that most of the clients
     // would get cleaned up via either the removalListener or the close() call, only the active clients
@@ -100,6 +149,7 @@ class HiveClientCache {
       @Override
       public void run() {
         LOG.debug("Cleaning up hive client cache in ShutDown hook");
+        cleanupHandle.cancel(false); // Cancel the maintenance thread.
         closeAllClientsQuietly();
       }
     };