You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2016/05/27 15:37:34 UTC

[13/48] hive git commit: HIVE-13561: HiveServer2 is leaking ClassLoaders when add jar / temporary functions are used (Trystan Leftwich reviewed by Vaibhav Gumashta)

HIVE-13561: HiveServer2 is leaking ClassLoaders when add jar / temporary functions are used (Trystan Leftwich reviewed by Vaibhav Gumashta)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/68a42108
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/68a42108
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/68a42108

Branch: refs/heads/java8
Commit: 68a4210808ecf965d9d8bb4c934cb548c334fe72
Parents: b420e1d
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Fri May 27 00:23:25 2016 -0700
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Fri May 27 00:23:25 2016 -0700

----------------------------------------------------------------------
 .../apache/hive/jdbc/TestJdbcWithMiniHS2.java   | 145 +++++++++++++++++++
 .../apache/hadoop/hive/ql/exec/Registry.java    |  10 +-
 2 files changed, 150 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/68a42108/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index 4aa98ca..a01daa4 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -24,7 +24,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
@@ -46,6 +48,8 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -54,6 +58,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.ObjectStore;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hive.common.util.ReflectionUtil;
 import org.apache.hive.jdbc.miniHS2.MiniHS2;
 import org.datanucleus.ClassLoaderResolver;
 import org.datanucleus.NucleusContext;
@@ -943,4 +948,144 @@ public class TestJdbcWithMiniHS2 {
     }
     return -1;
   }
+
+  /**
+   * Tests ADD JAR uses Hives ReflectionUtil.CONSTRUCTOR_CACHE
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testAddJarConstructorUnCaching() throws Exception {
+    // This test assumes the hive-contrib JAR has been built as part of the Hive build.
+    // Also dependent on the UDFExampleAdd class within that JAR.
+    setReflectionUtilCache();
+    String udfClassName = "org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd";
+    String mvnRepo = System.getProperty("maven.local.repository");
+    String hiveVersion = System.getProperty("hive.version");
+    String jarFileName = "hive-contrib-" + hiveVersion + ".jar";
+    String[] pathParts = {
+        "org", "apache", "hive",
+        "hive-contrib", hiveVersion, jarFileName
+    };
+
+    // Create path to hive-contrib JAR on local filesystem
+    Path jarFilePath = new Path(mvnRepo);
+    for (String pathPart : pathParts) {
+      jarFilePath = new Path(jarFilePath, pathPart);
+    }
+
+    Connection conn = getConnection(miniHS2.getJdbcURL(), "foo", "bar");
+    String tableName = "testAddJar";
+    Statement stmt = conn.createStatement();
+    stmt.execute("SET hive.support.concurrency = false");
+    // Create table
+    stmt.execute("DROP TABLE IF EXISTS " + tableName);
+    stmt.execute("CREATE TABLE " + tableName + " (key INT, value STRING)");
+    // Load data
+    stmt.execute("LOAD DATA LOCAL INPATH '" + kvDataFilePath.toString() + "' INTO TABLE "
+        + tableName);
+    ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName);
+    // Ensure table is populated
+    assertTrue(res.next());
+
+    long cacheBeforeAddJar, cacheAfterAddJar, cacheAfterClose;
+    // Force the cache clear so we know its empty
+    invalidateReflectionUtlCache();
+    cacheBeforeAddJar = getReflectionUtilCacheSize();
+    System.out.println("CONSTRUCTOR_CACHE size before add jar: " + cacheBeforeAddJar);
+    System.out.println("CONSTRUCTOR_CACHE as map before add jar:" + getReflectionUtilCache().asMap());
+    Assert.assertTrue("FAILED: CONSTRUCTOR_CACHE size before add jar: " + cacheBeforeAddJar,
+            cacheBeforeAddJar == 0);
+
+    // Add the jar file
+    stmt.execute("ADD JAR " + jarFilePath.toString());
+    // Create a temporary function using the jar
+    stmt.execute("CREATE TEMPORARY FUNCTION func AS '" + udfClassName + "'");
+    // Execute the UDF
+    res = stmt.executeQuery("SELECT func(value) from " + tableName);
+    assertTrue(res.next());
+
+    // Check to make sure the cache is now being used
+    cacheAfterAddJar = getReflectionUtilCacheSize();
+    System.out.println("CONSTRUCTOR_CACHE size after add jar: " + cacheAfterAddJar);
+    Assert.assertTrue("FAILED: CONSTRUCTOR_CACHE size after connection close: " + cacheAfterAddJar,
+            cacheAfterAddJar > 0);
+    conn.close();
+    TimeUnit.SECONDS.sleep(10);
+    // Have to force a cleanup of all expired entries here because its possible that the
+    // expired entries will still be counted in Cache.size().
+    // Taken from:
+    // http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/cache/CacheBuilder.html
+    cleanUpReflectionUtlCache();
+    cacheAfterClose = getReflectionUtilCacheSize();
+    System.out.println("CONSTRUCTOR_CACHE size after connection close: " + cacheAfterClose);
+    Assert.assertTrue("FAILED: CONSTRUCTOR_CACHE size after connection close: " + cacheAfterClose,
+            cacheAfterClose == 0);
+  }
+
+  private void setReflectionUtilCache() {
+    Field constructorCacheField;
+    Cache<Class<?>, Constructor<?>> tmp;
+    try {
+      constructorCacheField = ReflectionUtil.class.getDeclaredField("CONSTRUCTOR_CACHE");
+      if (constructorCacheField != null) {
+        constructorCacheField.setAccessible(true);
+        Field modifiersField = Field.class.getDeclaredField("modifiers");
+        modifiersField.setAccessible(true);
+        modifiersField.setInt(constructorCacheField, constructorCacheField.getModifiers() & ~Modifier.FINAL);
+        tmp = CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.SECONDS).concurrencyLevel(64).weakKeys().weakValues().build();
+        constructorCacheField.set(tmp.getClass(), tmp);
+      }
+    } catch (Exception e) {
+      System.out.println("Error when setting the CONSTRUCTOR_CACHE to expire: " + e);
+    }
+  }
+
+  private Cache getReflectionUtilCache() {
+    Field constructorCacheField;
+    try {
+      constructorCacheField = ReflectionUtil.class.getDeclaredField("CONSTRUCTOR_CACHE");
+      if (constructorCacheField != null) {
+        constructorCacheField.setAccessible(true);
+        return (Cache) constructorCacheField.get(null);
+      }
+    } catch (Exception e) {
+      System.out.println("Error when getting the CONSTRUCTOR_CACHE var: " + e);
+    }
+    return null;
+  }
+
+  private void invalidateReflectionUtlCache() {
+    try {
+        Cache constructorCache = getReflectionUtilCache();
+        if ( constructorCache != null ) {
+          constructorCache.invalidateAll();
+        }
+    } catch (Exception e) {
+      System.out.println("Error when trying to invalidate the cache: " + e);
+    }
+  }
+
+  private void cleanUpReflectionUtlCache() {
+    try {
+      Cache constructorCache = getReflectionUtilCache();
+      if ( constructorCache != null ) {
+        constructorCache.cleanUp();
+      }
+    } catch (Exception e) {
+      System.out.println("Error when trying to cleanUp the cache: " + e);
+    }
+  }
+
+  private long getReflectionUtilCacheSize() {
+    try {
+        Cache constructorCache = getReflectionUtilCache();
+        if ( constructorCache != null ) {
+          return constructorCache.size();
+        }
+    } catch (Exception e) {
+      System.out.println(e);
+    }
+    return -1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/68a42108/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
index 3b54b49..891514b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
 import org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hive.common.util.ReflectionUtil;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -125,7 +125,7 @@ public class Registry {
       case GENERIC_UDAF_RESOLVER:
         return registerGenericUDAF(
             functionName, (GenericUDAFResolver)
-            ReflectionUtils.newInstance(udfClass, null), resources);
+            ReflectionUtil.newInstance(udfClass, null), resources);
       case TABLE_FUNCTION_RESOLVER:
         // native or not would be decided by annotation. need to evaluate that first
         return registerTableFunction(functionName,
@@ -154,7 +154,7 @@ public class Registry {
       Class<? extends GenericUDF> genericUDFClass, FunctionResource... resources) {
     validateClass(genericUDFClass, GenericUDF.class);
     FunctionInfo fI = new FunctionInfo(isNative, functionName,
-        ReflectionUtils.newInstance(genericUDFClass, null), resources);
+        ReflectionUtil.newInstance(genericUDFClass, null), resources);
     addFunction(functionName, fI);
     return fI;
   }
@@ -179,7 +179,7 @@ public class Registry {
       Class<? extends GenericUDTF> genericUDTFClass, FunctionResource... resources) {
     validateClass(genericUDTFClass, GenericUDTF.class);
     FunctionInfo fI = new FunctionInfo(isNative, functionName,
-        ReflectionUtils.newInstance(genericUDTFClass, null), resources);
+        ReflectionUtil.newInstance(genericUDTFClass, null), resources);
     addFunction(functionName, fI);
     return fI;
   }
@@ -197,7 +197,7 @@ public class Registry {
       Class<? extends UDAF> udafClass, FunctionResource... resources) {
     validateClass(udafClass, UDAF.class);
     FunctionInfo function = new WindowFunctionInfo(isNative, functionName,
-        new GenericUDAFBridge(ReflectionUtils.newInstance(udafClass, null)), resources);
+        new GenericUDAFBridge(ReflectionUtil.newInstance(udafClass, null)), resources);
     addFunction(functionName, function);
     addFunction(WINDOW_FUNC_PREFIX + functionName, function);
     return function;