You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/01 03:31:01 UTC

[07/11] hive git commit: HIVE-13596 : HS2 should be able to get UDFs on demand from metastore (Sergey Shelukin, reviewed by Jason Dere)

HIVE-13596 : HS2 should be able to get UDFs on demand from metastore (Sergey Shelukin, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6460529a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6460529a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6460529a

Branch: refs/heads/llap
Commit: 6460529aa910d7dd4af499d4adf85cbcf67452ce
Parents: 1289aff
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Apr 29 11:01:34 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Apr 29 11:01:34 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../apache/hadoop/hive/ql/exec/Registry.java    | 323 ++++++++++++-------
 .../hadoop/hive/ql/session/SessionState.java    |   3 +-
 3 files changed, 218 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6460529a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index db4b9e8..46a3b96 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2322,6 +2322,9 @@ public class HiveConf extends Configuration {
     HIVE_SERVER2_BUILTIN_UDF_BLACKLIST("hive.server2.builtin.udf.blacklist", "",
          "Comma separated list of udfs names. These udfs will not be allowed in queries." +
          " The udf black list takes precedence over udf white list"),
+     HIVE_ALLOW_UDF_LOAD_ON_DEMAND("hive.allow.udf.load.on.demand", false,
+         "Whether enable loading UDFs from metastore on demand; this is mostly relevant for\n" +
+         "HS2 and was the default behavior before Hive 1.2. Off by default."),
 
     HIVE_SERVER2_SESSION_CHECK_INTERVAL("hive.server2.session.check.interval", "6h",
         new TimeValidator(TimeUnit.MILLISECONDS, 3000l, true, null, false),

http://git-wip-us.apache.org/repos/asf/hive/blob/6460529a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
index d5f4a37..3b54b49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
@@ -24,8 +24,12 @@ import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -46,7 +50,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import java.io.IOException;
-import java.net.URLClassLoader;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -56,6 +59,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
@@ -80,14 +84,16 @@ public class Registry {
   private final Set<ClassLoader> mSessionUDFLoaders = new LinkedHashSet<ClassLoader>();
 
   private final boolean isNative;
+  /**
+   * The epic lock for the registry. This was added to replace the synchronized methods with
+   * minimum disruption; the locking should really be made more granular here.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
 
-  Registry(boolean isNative) {
+  public Registry(boolean isNative) {
     this.isNative = isNative;
   }
 
-  public Registry() {
-    this(false);
-  }
 
   /**
    * Registers the appropriate kind of temporary function based on a class's
@@ -157,11 +163,16 @@ public class Registry {
    * Registers the UDF class as a built-in function; used for dynamically created UDFs, like
    * GenericUDFOP*Minus/Plus.
    */
-  public synchronized void registerHiddenBuiltIn(Class<? extends GenericUDF> functionClass) {
-    if (!isNative) {
-      throw new RuntimeException("Builtin is not for this registry");
+  public void registerHiddenBuiltIn(Class<? extends GenericUDF> functionClass) {
+    lock.lock();
+    try {
+      if (!isNative) {
+        throw new RuntimeException("Builtin is not for this registry");
+      }
+      builtIns.add(functionClass);
+    } finally {
+      lock.unlock();
     }
-    builtIns.add(functionClass);
   }
 
   public FunctionInfo registerGenericUDTF(String functionName,
@@ -256,23 +267,29 @@ public class Registry {
    * @param functionName
    * @return
    */
-  public synchronized FunctionInfo getFunctionInfo(String functionName) throws SemanticException {
-    functionName = functionName.toLowerCase();
-    if (FunctionUtils.isQualifiedFunctionName(functionName)) {
-      return getQualifiedFunctionInfo(functionName);
-    }
-    // First try without qualifiers - would resolve builtin/temp functions.
-    // Otherwise try qualifying with current db name.
-    FunctionInfo functionInfo = mFunctions.get(functionName);
-    if (functionInfo != null && functionInfo.isBlockedFunction()) {
-      throw new SemanticException ("UDF " + functionName + " is not allowed");
-    }
-    if (functionInfo == null) {
-      String qualifiedName = FunctionUtils.qualifyFunctionName(
-          functionName, SessionState.get().getCurrentDatabase().toLowerCase());
-      functionInfo = getQualifiedFunctionInfo(qualifiedName);
-    }
+  public FunctionInfo getFunctionInfo(String functionName) throws SemanticException {
+    lock.lock();
+    try {
+      functionName = functionName.toLowerCase();
+      if (FunctionUtils.isQualifiedFunctionName(functionName)) {
+        return getQualifiedFunctionInfoUnderLock(functionName);
+      }
+      // First try without qualifiers - would resolve builtin/temp functions.
+      // Otherwise try qualifying with current db name.
+      FunctionInfo functionInfo = mFunctions.get(functionName);
+      if (functionInfo != null && functionInfo.isBlockedFunction()) {
+        throw new SemanticException ("UDF " + functionName + " is not allowed");
+      }
+      if (functionInfo == null) {
+        String qualifiedName = FunctionUtils.qualifyFunctionName(
+            functionName, SessionState.get().getCurrentDatabase().toLowerCase());
+        functionInfo = getQualifiedFunctionInfoUnderLock(qualifiedName);
+      }
     return functionInfo;
+    } finally {
+      lock.unlock();
+    }
+
   }
 
   public WindowFunctionInfo getWindowFunctionInfo(String functionName) throws SemanticException {
@@ -295,15 +312,23 @@ public class Registry {
     return udfClass != null && persistent.containsKey(udfClass);
   }
 
-  public synchronized Set<String> getCurrentFunctionNames() {
-    return getFunctionNames((Pattern)null);
+  public Set<String> getCurrentFunctionNames() {
+    lock.lock();
+    try {
+      return getFunctionNames((Pattern)null);
+    } finally {
+      lock.unlock();
+    }
   }
 
-  public synchronized Set<String> getFunctionNames(String funcPatternStr) {
+  public Set<String> getFunctionNames(String funcPatternStr) {
+    lock.lock();
     try {
       return getFunctionNames(Pattern.compile(funcPatternStr));
     } catch (PatternSyntaxException e) {
       return Collections.emptySet();
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -315,17 +340,22 @@ public class Registry {
    * @param funcPattern regular expression of the interested function names
    * @return set of strings contains function names
    */
-  public synchronized Set<String> getFunctionNames(Pattern funcPattern) {
-    Set<String> funcNames = new TreeSet<String>();
-    for (String funcName : mFunctions.keySet()) {
-      if (funcName.contains(WINDOW_FUNC_PREFIX)) {
-        continue;
-      }
-      if (funcPattern == null || funcPattern.matcher(funcName).matches()) {
-        funcNames.add(funcName);
+  public Set<String> getFunctionNames(Pattern funcPattern) {
+    lock.lock();
+    try {
+      Set<String> funcNames = new TreeSet<String>();
+      for (String funcName : mFunctions.keySet()) {
+        if (funcName.contains(WINDOW_FUNC_PREFIX)) {
+          continue;
+        }
+        if (funcPattern == null || funcPattern.matcher(funcName).matches()) {
+          funcNames.add(funcName);
+        }
       }
+      return funcNames;
+    } finally {
+      lock.unlock();
     }
-    return funcNames;
   }
 
   /**
@@ -334,18 +364,23 @@ public class Registry {
    * @param funcInfo
    * @param synonyms
    */
-  public synchronized void getFunctionSynonyms(
+  public void getFunctionSynonyms(
       String funcName, FunctionInfo funcInfo, Set<String> synonyms) throws SemanticException {
-    Class<?> funcClass = funcInfo.getFunctionClass();
-    for (Map.Entry<String, FunctionInfo> entry : mFunctions.entrySet()) {
-      String name = entry.getKey();
-      if (name.contains(WINDOW_FUNC_PREFIX) || name.equals(funcName)) {
-        continue;
-      }
-      FunctionInfo function = entry.getValue();
-      if (function.getFunctionClass() == funcClass) {
-        synonyms.add(name);
+    lock.lock();
+    try {
+      Class<?> funcClass = funcInfo.getFunctionClass();
+      for (Map.Entry<String, FunctionInfo> entry : mFunctions.entrySet()) {
+        String name = entry.getKey();
+        if (name.contains(WINDOW_FUNC_PREFIX) || name.equals(funcName)) {
+          continue;
+        }
+        FunctionInfo function = entry.getValue();
+        if (function.getFunctionClass() == funcClass) {
+          synonyms.add(name);
+        }
       }
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -409,26 +444,31 @@ public class Registry {
     return ((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo);
   }
 
-  private synchronized void addFunction(String functionName, FunctionInfo function) {
-    if (isNative ^ function.isNative()) {
-      throw new RuntimeException("Function " + functionName + " is not for this registry");
-    }
-    functionName = functionName.toLowerCase();
-    FunctionInfo prev = mFunctions.get(functionName);
-    if (prev != null) {
-      if (isBuiltInFunc(prev.getFunctionClass())) {
-        throw new RuntimeException("Function " + functionName + " is hive builtin function, " +
-            "which cannot be overriden.");
+  private void addFunction(String functionName, FunctionInfo function) {
+    lock.lock();
+    try {
+      if (isNative != function.isNative()) {
+        throw new RuntimeException("Function " + functionName + " is not for this registry");
       }
-      prev.discarded();
-    }
-    mFunctions.put(functionName, function);
-    if (function.isBuiltIn()) {
-      builtIns.add(function.getFunctionClass());
-    } else if (function.isPersistent()) {
-      Class<?> functionClass = getPermanentUdfClass(function);
-      Integer refCount = persistent.get(functionClass);
-      persistent.put(functionClass, Integer.valueOf(refCount == null ? 1 : refCount + 1));
+      functionName = functionName.toLowerCase();
+      FunctionInfo prev = mFunctions.get(functionName);
+      if (prev != null) {
+        if (isBuiltInFunc(prev.getFunctionClass())) {
+          throw new RuntimeException("Function " + functionName + " is hive builtin function, " +
+              "which cannot be overriden.");
+        }
+        prev.discarded();
+      }
+      mFunctions.put(functionName, function);
+      if (function.isBuiltIn()) {
+        builtIns.add(function.getFunctionClass());
+      } else if (function.isPersistent()) {
+        Class<?> functionClass = getPermanentUdfClass(function);
+        Integer refCount = persistent.get(functionClass);
+        persistent.put(functionClass, Integer.valueOf(refCount == null ? 1 : refCount + 1));
+      }
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -446,23 +486,27 @@ public class Registry {
     return functionClass;
   }
 
-  public synchronized void unregisterFunction(String functionName) throws HiveException {
-    functionName = functionName.toLowerCase();
-    FunctionInfo fi = mFunctions.get(functionName);
-    if (fi != null) {
-      if (fi.isBuiltIn()) {
-        throw new HiveException(ErrorMsg.DROP_NATIVE_FUNCTION.getMsg(functionName));
-      }
-      mFunctions.remove(functionName);
-      fi.discarded();
-      if (fi.isPersistent()) {
-        removePersistentFunction(fi);
+  public void unregisterFunction(String functionName) throws HiveException {
+    lock.lock();
+    try {
+      functionName = functionName.toLowerCase();
+      FunctionInfo fi = mFunctions.get(functionName);
+      if (fi != null) {
+        if (fi.isBuiltIn()) {
+          throw new HiveException(ErrorMsg.DROP_NATIVE_FUNCTION.getMsg(functionName));
+        }
+        mFunctions.remove(functionName);
+        fi.discarded();
+        if (fi.isPersistent()) {
+          removePersistentFunctionUnderLock(fi);
+        }
       }
+    } finally {
+      lock.unlock();
     }
   }
 
-  /** Should only be called from synchronized methods. */
-  private void removePersistentFunction(FunctionInfo fi) {
+  private void removePersistentFunctionUnderLock(FunctionInfo fi) {
     Class<?> functionClass = getPermanentUdfClass(fi);
     Integer refCount = persistent.get(functionClass);
     assert refCount != null;
@@ -478,10 +522,15 @@ public class Registry {
    * @param dbName database name
    * @throws HiveException
    */
-  public synchronized void unregisterFunctions(String dbName) throws HiveException {
-    Set<String> funcNames = getFunctionNames(dbName.toLowerCase() + "\\..*");
-    for (String funcName : funcNames) {
-      unregisterFunction(funcName);
+  public void unregisterFunctions(String dbName) throws HiveException {
+    lock.lock();
+    try {
+      Set<String> funcNames = getFunctionNames(dbName.toLowerCase() + "\\..*");
+      for (String funcName : funcNames) {
+        unregisterFunction(funcName);
+      }
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -493,7 +542,7 @@ public class Registry {
     return null;
   }
 
-  private FunctionInfo getQualifiedFunctionInfo(String qualifiedName) throws SemanticException {
+  private FunctionInfo getQualifiedFunctionInfoUnderLock(String qualifiedName) throws SemanticException {
     FunctionInfo info = mFunctions.get(qualifiedName);
     if (info != null && info.isBlockedFunction()) {
       throw new SemanticException ("UDF " + qualifiedName + " is not allowed");
@@ -509,7 +558,24 @@ public class Registry {
     if (isNative && info != null && info.isPersistent()) {
       return registerToSessionRegistry(qualifiedName, info);
     }
-    return info;
+    if (info != null || !isNative) {
+      return info; // We have the UDF, or we are in the session registry (or both).
+    }
+    // If we are in the system registry and this feature is enabled, try to get it from metastore.
+    SessionState ss = SessionState.get();
+    HiveConf conf = (ss == null) ? null : ss.getConf();
+    if (conf == null || !HiveConf.getBoolVar(conf, ConfVars.HIVE_ALLOW_UDF_LOAD_ON_DEMAND)) {
+      return null;
+    }
+    // This is a little bit weird. We'll do the MS call outside of the lock. Our caller calls us
+    // under lock, so we'd preserve the lock state for them; their finally block will release the
+    // lock correctly. See the comment on the lock field - the locking needs to be reworked.
+    lock.unlock();
+    try {
+      return getFunctionInfoFromMetastoreNoLock(qualifiedName, conf);
+    } finally {
+      lock.lock();
+    }
   }
 
   // should be called after session registry is checked
@@ -545,40 +611,52 @@ public class Registry {
     return ret;
   }
 
-  private void checkFunctionClass(FunctionInfo cfi) throws ClassNotFoundException {
-    // This call will fail for non-generic UDFs using GenericUDFBridge
-    Class<?> udfClass = cfi.getFunctionClass();
-    // Even if we have a reference to the class (which will be the case for GenericUDFs),
-    // the classloader may not be able to resolve the class, which would mean reflection-based
-    // methods would fail such as for plan deserialization. Make sure this works too.
-    Class.forName(udfClass.getName(), true, Utilities.getSessionSpecifiedClassLoader());
-  }
-
-  public synchronized void clear() {
-    if (isNative) {
-      throw new IllegalStateException("System function registry cannot be cleared");
+  public void clear() {
+    lock.lock();
+    try {
+      if (isNative) {
+        throw new IllegalStateException("System function registry cannot be cleared");
+      }
+      mFunctions.clear();
+      builtIns.clear();
+      persistent.clear();
+    } finally {
+      lock.unlock();
     }
-    mFunctions.clear();
-    builtIns.clear();
-    persistent.clear();
   }
 
-  public synchronized void closeCUDFLoaders() {
+  public void closeCUDFLoaders() {
+    lock.lock();
     try {
-      for(ClassLoader loader: mSessionUDFLoaders) {
-        JavaUtils.closeClassLoader(loader);
+      try {
+        for(ClassLoader loader: mSessionUDFLoaders) {
+          JavaUtils.closeClassLoader(loader);
+        }
+      } catch (IOException ie) {
+          LOG.error("Error in close loader: " + ie);
       }
-    } catch (IOException ie) {
-        LOG.error("Error in close loader: " + ie);
+      mSessionUDFLoaders.clear();
+    } finally {
+      lock.unlock();
     }
-    mSessionUDFLoaders.clear();
   }
 
-  public synchronized void addToUDFLoaders(ClassLoader loader) {
-    mSessionUDFLoaders.add(loader);
+  public void addToUDFLoaders(ClassLoader loader) {
+    lock.lock();
+    try {
+      mSessionUDFLoaders.add(loader);
+    } finally {
+      lock.unlock();
+    }
   }
-  public synchronized void removeFromUDFLoaders(ClassLoader loader) {
-    mSessionUDFLoaders.remove(loader);
+
+  public void removeFromUDFLoaders(ClassLoader loader) {
+    lock.lock();
+    try {
+      mSessionUDFLoaders.remove(loader);
+    } finally {
+      lock.unlock();
+    }
   }
 
   /**
@@ -611,4 +689,29 @@ public class Registry {
     return blackList.contains(functionName) ||
         (!whiteList.isEmpty() && !whiteList.contains(functionName));
   }
+
+  /**
+   * This is called outside of the lock. Some of the methods that are called transitively by
+   * this (e.g. addFunction) will take the lock again and then release it, which is ok.
+   */
+  private FunctionInfo getFunctionInfoFromMetastoreNoLock(String functionName, HiveConf conf) {
+    try {
+      String[] parts = FunctionUtils.getQualifiedFunctionNameParts(functionName);
+      Function func = Hive.get(conf).getFunction(parts[0].toLowerCase(), parts[1]);
+      if (func == null) {
+        return null;
+      }
+      // Found UDF in metastore - now add it to the function registry.
+      FunctionInfo fi = registerPermanentFunction(functionName, func.getClassName(), true,
+          FunctionTask.toFunctionResource(func.getResourceUris()));
+      if (fi == null) {
+        LOG.error(func.getClassName() + " is not a valid UDF class and was not registered");
+        return null;
+      }
+      return fi;
+    } catch (Throwable e) {
+      LOG.info("Unable to look up " + functionName + " in metastore", e);
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6460529a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 672df63..d211eb2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -259,7 +259,7 @@ public class SessionState {
    */
   private final Set<String> preReloadableAuxJars = new HashSet<String>();
 
-  private final Registry registry = new Registry();
+  private final Registry registry;
 
   /**
    * CURRENT_TIMESTAMP value for query
@@ -352,6 +352,7 @@ public class SessionState {
   public SessionState(HiveConf conf, String userName) {
     this.sessionConf = conf;
     this.userName = userName;
+    this.registry = new Registry(false);
     if (LOG.isDebugEnabled()) {
       LOG.debug("SessionState user: " + userName);
     }