You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/01 03:31:01 UTC
[07/11] hive git commit: HIVE-13596 : HS2 should be able to get UDFs
on demand from metastore (Sergey Shelukin, reviewed by Jason Dere)
HIVE-13596 : HS2 should be able to get UDFs on demand from metastore (Sergey Shelukin, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6460529a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6460529a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6460529a
Branch: refs/heads/llap
Commit: 6460529aa910d7dd4af499d4adf85cbcf67452ce
Parents: 1289aff
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Apr 29 11:01:34 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Apr 29 11:01:34 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../apache/hadoop/hive/ql/exec/Registry.java | 323 ++++++++++++-------
.../hadoop/hive/ql/session/SessionState.java | 3 +-
3 files changed, 218 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/6460529a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index db4b9e8..46a3b96 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2322,6 +2322,9 @@ public class HiveConf extends Configuration {
HIVE_SERVER2_BUILTIN_UDF_BLACKLIST("hive.server2.builtin.udf.blacklist", "",
"Comma separated list of udfs names. These udfs will not be allowed in queries." +
" The udf black list takes precedence over udf white list"),
+ HIVE_ALLOW_UDF_LOAD_ON_DEMAND("hive.allow.udf.load.on.demand", false,
+ "Whether enable loading UDFs from metastore on demand; this is mostly relevant for\n" +
+ "HS2 and was the default behavior before Hive 1.2. Off by default."),
HIVE_SERVER2_SESSION_CHECK_INTERVAL("hive.server2.session.check.interval", "6h",
new TimeValidator(TimeUnit.MILLISECONDS, 3000l, true, null, false),
http://git-wip-us.apache.org/repos/asf/hive/blob/6460529a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
index d5f4a37..3b54b49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
@@ -24,8 +24,12 @@ import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -46,7 +50,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
-import java.net.URLClassLoader;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -56,6 +59,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -80,14 +84,16 @@ public class Registry {
private final Set<ClassLoader> mSessionUDFLoaders = new LinkedHashSet<ClassLoader>();
private final boolean isNative;
+ /**
+ * The epic lock for the registry. This was added to replace the synchronized methods with
+ * minimum disruption; the locking should really be made more granular here.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
- Registry(boolean isNative) {
+ public Registry(boolean isNative) {
this.isNative = isNative;
}
- public Registry() {
- this(false);
- }
/**
* Registers the appropriate kind of temporary function based on a class's
@@ -157,11 +163,16 @@ public class Registry {
* Registers the UDF class as a built-in function; used for dynamically created UDFs, like
* GenericUDFOP*Minus/Plus.
*/
- public synchronized void registerHiddenBuiltIn(Class<? extends GenericUDF> functionClass) {
- if (!isNative) {
- throw new RuntimeException("Builtin is not for this registry");
+ public void registerHiddenBuiltIn(Class<? extends GenericUDF> functionClass) {
+ lock.lock();
+ try {
+ if (!isNative) {
+ throw new RuntimeException("Builtin is not for this registry");
+ }
+ builtIns.add(functionClass);
+ } finally {
+ lock.unlock();
}
- builtIns.add(functionClass);
}
public FunctionInfo registerGenericUDTF(String functionName,
@@ -256,23 +267,29 @@ public class Registry {
* @param functionName
* @return
*/
- public synchronized FunctionInfo getFunctionInfo(String functionName) throws SemanticException {
- functionName = functionName.toLowerCase();
- if (FunctionUtils.isQualifiedFunctionName(functionName)) {
- return getQualifiedFunctionInfo(functionName);
- }
- // First try without qualifiers - would resolve builtin/temp functions.
- // Otherwise try qualifying with current db name.
- FunctionInfo functionInfo = mFunctions.get(functionName);
- if (functionInfo != null && functionInfo.isBlockedFunction()) {
- throw new SemanticException ("UDF " + functionName + " is not allowed");
- }
- if (functionInfo == null) {
- String qualifiedName = FunctionUtils.qualifyFunctionName(
- functionName, SessionState.get().getCurrentDatabase().toLowerCase());
- functionInfo = getQualifiedFunctionInfo(qualifiedName);
- }
+ public FunctionInfo getFunctionInfo(String functionName) throws SemanticException {
+ lock.lock();
+ try {
+ functionName = functionName.toLowerCase();
+ if (FunctionUtils.isQualifiedFunctionName(functionName)) {
+ return getQualifiedFunctionInfoUnderLock(functionName);
+ }
+ // First try without qualifiers - would resolve builtin/temp functions.
+ // Otherwise try qualifying with current db name.
+ FunctionInfo functionInfo = mFunctions.get(functionName);
+ if (functionInfo != null && functionInfo.isBlockedFunction()) {
+ throw new SemanticException ("UDF " + functionName + " is not allowed");
+ }
+ if (functionInfo == null) {
+ String qualifiedName = FunctionUtils.qualifyFunctionName(
+ functionName, SessionState.get().getCurrentDatabase().toLowerCase());
+ functionInfo = getQualifiedFunctionInfoUnderLock(qualifiedName);
+ }
return functionInfo;
+ } finally {
+ lock.unlock();
+ }
+
}
public WindowFunctionInfo getWindowFunctionInfo(String functionName) throws SemanticException {
@@ -295,15 +312,23 @@ public class Registry {
return udfClass != null && persistent.containsKey(udfClass);
}
- public synchronized Set<String> getCurrentFunctionNames() {
- return getFunctionNames((Pattern)null);
+ public Set<String> getCurrentFunctionNames() {
+ lock.lock();
+ try {
+ return getFunctionNames((Pattern)null);
+ } finally {
+ lock.unlock();
+ }
}
- public synchronized Set<String> getFunctionNames(String funcPatternStr) {
+ public Set<String> getFunctionNames(String funcPatternStr) {
+ lock.lock();
try {
return getFunctionNames(Pattern.compile(funcPatternStr));
} catch (PatternSyntaxException e) {
return Collections.emptySet();
+ } finally {
+ lock.unlock();
}
}
@@ -315,17 +340,22 @@ public class Registry {
* @param funcPattern regular expression of the interested function names
* @return set of strings contains function names
*/
- public synchronized Set<String> getFunctionNames(Pattern funcPattern) {
- Set<String> funcNames = new TreeSet<String>();
- for (String funcName : mFunctions.keySet()) {
- if (funcName.contains(WINDOW_FUNC_PREFIX)) {
- continue;
- }
- if (funcPattern == null || funcPattern.matcher(funcName).matches()) {
- funcNames.add(funcName);
+ public Set<String> getFunctionNames(Pattern funcPattern) {
+ lock.lock();
+ try {
+ Set<String> funcNames = new TreeSet<String>();
+ for (String funcName : mFunctions.keySet()) {
+ if (funcName.contains(WINDOW_FUNC_PREFIX)) {
+ continue;
+ }
+ if (funcPattern == null || funcPattern.matcher(funcName).matches()) {
+ funcNames.add(funcName);
+ }
}
+ return funcNames;
+ } finally {
+ lock.unlock();
}
- return funcNames;
}
/**
@@ -334,18 +364,23 @@ public class Registry {
* @param funcInfo
* @param synonyms
*/
- public synchronized void getFunctionSynonyms(
+ public void getFunctionSynonyms(
String funcName, FunctionInfo funcInfo, Set<String> synonyms) throws SemanticException {
- Class<?> funcClass = funcInfo.getFunctionClass();
- for (Map.Entry<String, FunctionInfo> entry : mFunctions.entrySet()) {
- String name = entry.getKey();
- if (name.contains(WINDOW_FUNC_PREFIX) || name.equals(funcName)) {
- continue;
- }
- FunctionInfo function = entry.getValue();
- if (function.getFunctionClass() == funcClass) {
- synonyms.add(name);
+ lock.lock();
+ try {
+ Class<?> funcClass = funcInfo.getFunctionClass();
+ for (Map.Entry<String, FunctionInfo> entry : mFunctions.entrySet()) {
+ String name = entry.getKey();
+ if (name.contains(WINDOW_FUNC_PREFIX) || name.equals(funcName)) {
+ continue;
+ }
+ FunctionInfo function = entry.getValue();
+ if (function.getFunctionClass() == funcClass) {
+ synonyms.add(name);
+ }
}
+ } finally {
+ lock.unlock();
}
}
@@ -409,26 +444,31 @@ public class Registry {
return ((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo);
}
- private synchronized void addFunction(String functionName, FunctionInfo function) {
- if (isNative ^ function.isNative()) {
- throw new RuntimeException("Function " + functionName + " is not for this registry");
- }
- functionName = functionName.toLowerCase();
- FunctionInfo prev = mFunctions.get(functionName);
- if (prev != null) {
- if (isBuiltInFunc(prev.getFunctionClass())) {
- throw new RuntimeException("Function " + functionName + " is hive builtin function, " +
- "which cannot be overriden.");
+ private void addFunction(String functionName, FunctionInfo function) {
+ lock.lock();
+ try {
+ if (isNative != function.isNative()) {
+ throw new RuntimeException("Function " + functionName + " is not for this registry");
}
- prev.discarded();
- }
- mFunctions.put(functionName, function);
- if (function.isBuiltIn()) {
- builtIns.add(function.getFunctionClass());
- } else if (function.isPersistent()) {
- Class<?> functionClass = getPermanentUdfClass(function);
- Integer refCount = persistent.get(functionClass);
- persistent.put(functionClass, Integer.valueOf(refCount == null ? 1 : refCount + 1));
+ functionName = functionName.toLowerCase();
+ FunctionInfo prev = mFunctions.get(functionName);
+ if (prev != null) {
+ if (isBuiltInFunc(prev.getFunctionClass())) {
+ throw new RuntimeException("Function " + functionName + " is hive builtin function, " +
+ "which cannot be overriden.");
+ }
+ prev.discarded();
+ }
+ mFunctions.put(functionName, function);
+ if (function.isBuiltIn()) {
+ builtIns.add(function.getFunctionClass());
+ } else if (function.isPersistent()) {
+ Class<?> functionClass = getPermanentUdfClass(function);
+ Integer refCount = persistent.get(functionClass);
+ persistent.put(functionClass, Integer.valueOf(refCount == null ? 1 : refCount + 1));
+ }
+ } finally {
+ lock.unlock();
}
}
@@ -446,23 +486,27 @@ public class Registry {
return functionClass;
}
- public synchronized void unregisterFunction(String functionName) throws HiveException {
- functionName = functionName.toLowerCase();
- FunctionInfo fi = mFunctions.get(functionName);
- if (fi != null) {
- if (fi.isBuiltIn()) {
- throw new HiveException(ErrorMsg.DROP_NATIVE_FUNCTION.getMsg(functionName));
- }
- mFunctions.remove(functionName);
- fi.discarded();
- if (fi.isPersistent()) {
- removePersistentFunction(fi);
+ public void unregisterFunction(String functionName) throws HiveException {
+ lock.lock();
+ try {
+ functionName = functionName.toLowerCase();
+ FunctionInfo fi = mFunctions.get(functionName);
+ if (fi != null) {
+ if (fi.isBuiltIn()) {
+ throw new HiveException(ErrorMsg.DROP_NATIVE_FUNCTION.getMsg(functionName));
+ }
+ mFunctions.remove(functionName);
+ fi.discarded();
+ if (fi.isPersistent()) {
+ removePersistentFunctionUnderLock(fi);
+ }
}
+ } finally {
+ lock.unlock();
}
}
- /** Should only be called from synchronized methods. */
- private void removePersistentFunction(FunctionInfo fi) {
+ private void removePersistentFunctionUnderLock(FunctionInfo fi) {
Class<?> functionClass = getPermanentUdfClass(fi);
Integer refCount = persistent.get(functionClass);
assert refCount != null;
@@ -478,10 +522,15 @@ public class Registry {
* @param dbName database name
* @throws HiveException
*/
- public synchronized void unregisterFunctions(String dbName) throws HiveException {
- Set<String> funcNames = getFunctionNames(dbName.toLowerCase() + "\\..*");
- for (String funcName : funcNames) {
- unregisterFunction(funcName);
+ public void unregisterFunctions(String dbName) throws HiveException {
+ lock.lock();
+ try {
+ Set<String> funcNames = getFunctionNames(dbName.toLowerCase() + "\\..*");
+ for (String funcName : funcNames) {
+ unregisterFunction(funcName);
+ }
+ } finally {
+ lock.unlock();
}
}
@@ -493,7 +542,7 @@ public class Registry {
return null;
}
- private FunctionInfo getQualifiedFunctionInfo(String qualifiedName) throws SemanticException {
+ private FunctionInfo getQualifiedFunctionInfoUnderLock(String qualifiedName) throws SemanticException {
FunctionInfo info = mFunctions.get(qualifiedName);
if (info != null && info.isBlockedFunction()) {
throw new SemanticException ("UDF " + qualifiedName + " is not allowed");
@@ -509,7 +558,24 @@ public class Registry {
if (isNative && info != null && info.isPersistent()) {
return registerToSessionRegistry(qualifiedName, info);
}
- return info;
+ if (info != null || !isNative) {
+ return info; // We have the UDF, or we are in the session registry (or both).
+ }
+ // If we are in the system registry and this feature is enabled, try to get it from metastore.
+ SessionState ss = SessionState.get();
+ HiveConf conf = (ss == null) ? null : ss.getConf();
+ if (conf == null || !HiveConf.getBoolVar(conf, ConfVars.HIVE_ALLOW_UDF_LOAD_ON_DEMAND)) {
+ return null;
+ }
+ // This is a little bit weird. We'll do the MS call outside of the lock. Our caller calls us
+ // under lock, so we'd preserve the lock state for them; their finally block will release the
+ // lock correctly. See the comment on the lock field - the locking needs to be reworked.
+ lock.unlock();
+ try {
+ return getFunctionInfoFromMetastoreNoLock(qualifiedName, conf);
+ } finally {
+ lock.lock();
+ }
}
// should be called after session registry is checked
@@ -545,40 +611,52 @@ public class Registry {
return ret;
}
- private void checkFunctionClass(FunctionInfo cfi) throws ClassNotFoundException {
- // This call will fail for non-generic UDFs using GenericUDFBridge
- Class<?> udfClass = cfi.getFunctionClass();
- // Even if we have a reference to the class (which will be the case for GenericUDFs),
- // the classloader may not be able to resolve the class, which would mean reflection-based
- // methods would fail such as for plan deserialization. Make sure this works too.
- Class.forName(udfClass.getName(), true, Utilities.getSessionSpecifiedClassLoader());
- }
-
- public synchronized void clear() {
- if (isNative) {
- throw new IllegalStateException("System function registry cannot be cleared");
+ public void clear() {
+ lock.lock();
+ try {
+ if (isNative) {
+ throw new IllegalStateException("System function registry cannot be cleared");
+ }
+ mFunctions.clear();
+ builtIns.clear();
+ persistent.clear();
+ } finally {
+ lock.unlock();
}
- mFunctions.clear();
- builtIns.clear();
- persistent.clear();
}
- public synchronized void closeCUDFLoaders() {
+ public void closeCUDFLoaders() {
+ lock.lock();
try {
- for(ClassLoader loader: mSessionUDFLoaders) {
- JavaUtils.closeClassLoader(loader);
+ try {
+ for(ClassLoader loader: mSessionUDFLoaders) {
+ JavaUtils.closeClassLoader(loader);
+ }
+ } catch (IOException ie) {
+ LOG.error("Error in close loader: " + ie);
}
- } catch (IOException ie) {
- LOG.error("Error in close loader: " + ie);
+ mSessionUDFLoaders.clear();
+ } finally {
+ lock.unlock();
}
- mSessionUDFLoaders.clear();
}
- public synchronized void addToUDFLoaders(ClassLoader loader) {
- mSessionUDFLoaders.add(loader);
+ public void addToUDFLoaders(ClassLoader loader) {
+ lock.lock();
+ try {
+ mSessionUDFLoaders.add(loader);
+ } finally {
+ lock.unlock();
+ }
}
- public synchronized void removeFromUDFLoaders(ClassLoader loader) {
- mSessionUDFLoaders.remove(loader);
+
+ public void removeFromUDFLoaders(ClassLoader loader) {
+ lock.lock();
+ try {
+ mSessionUDFLoaders.remove(loader);
+ } finally {
+ lock.unlock();
+ }
}
/**
@@ -611,4 +689,29 @@ public class Registry {
return blackList.contains(functionName) ||
(!whiteList.isEmpty() && !whiteList.contains(functionName));
}
+
+ /**
+ * This is called outside of the lock. Some of the methods that are called transitively by
+ * this (e.g. addFunction) will take the lock again and then release it, which is ok.
+ */
+ private FunctionInfo getFunctionInfoFromMetastoreNoLock(String functionName, HiveConf conf) {
+ try {
+ String[] parts = FunctionUtils.getQualifiedFunctionNameParts(functionName);
+ Function func = Hive.get(conf).getFunction(parts[0].toLowerCase(), parts[1]);
+ if (func == null) {
+ return null;
+ }
+ // Found UDF in metastore - now add it to the function registry.
+ FunctionInfo fi = registerPermanentFunction(functionName, func.getClassName(), true,
+ FunctionTask.toFunctionResource(func.getResourceUris()));
+ if (fi == null) {
+ LOG.error(func.getClassName() + " is not a valid UDF class and was not registered");
+ return null;
+ }
+ return fi;
+ } catch (Throwable e) {
+ LOG.info("Unable to look up " + functionName + " in metastore", e);
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6460529a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 672df63..d211eb2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -259,7 +259,7 @@ public class SessionState {
*/
private final Set<String> preReloadableAuxJars = new HashSet<String>();
- private final Registry registry = new Registry();
+ private final Registry registry;
/**
* CURRENT_TIMESTAMP value for query
@@ -352,6 +352,7 @@ public class SessionState {
public SessionState(HiveConf conf, String userName) {
this.sessionConf = conf;
this.userName = userName;
+ this.registry = new Registry(false);
if (LOG.isDebugEnabled()) {
LOG.debug("SessionState user: " + userName);
}