You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2016/03/23 12:05:10 UTC

hive git commit: HIVE-13307: LLAP: Slider package should contain permanent functions (Gopal V, reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master e3ca81ea7 -> 2173e1d94


HIVE-13307: LLAP: Slider package should contain permanent functions (Gopal V, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2173e1d9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2173e1d9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2173e1d9

Branch: refs/heads/master
Commit: 2173e1d94d91795df02c9809324c131a2f3d6a03
Parents: e3ca81e
Author: Gopal V <go...@apache.org>
Authored: Wed Mar 23 04:04:54 2016 -0700
Committer: Gopal V <go...@apache.org>
Committed: Wed Mar 23 04:04:54 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  7 +-
 llap-server/bin/runLlapDaemon.sh                |  2 +-
 .../hadoop/hive/llap/cli/LlapServiceDriver.java | 71 ++++++++++++++++++++
 .../hive/llap/daemon/impl/LlapDaemon.java       | 14 ++--
 .../impl/StaticPermanentFunctionChecker.java    | 70 +++++++++++++++++++
 .../hive/ql/optimizer/physical/LlapDecider.java |  2 +-
 6 files changed, 156 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2173e1d9/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0f8d67f..c14df20 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -323,7 +323,8 @@ public class HiveConf extends Configuration {
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL.varname);
-    llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_ALLOW_PERMANENT_FNS.varname);
+    llapDaemonVarsSetLocal.add(ConfVars.LLAP_ALLOW_PERMANENT_FNS.varname);
+    llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_DOWNLOAD_PERMANENT_FNS.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION.varname);
@@ -2576,6 +2577,8 @@ public class HiveConf extends Configuration {
         "Whether to skip the compile-time check for non-built-in UDFs when deciding whether to\n" +
         "execute tasks in LLAP. Skipping the check allows executing UDFs from pre-localized\n" +
         "jars in LLAP; if the jars are not pre-localized, the UDFs will simply fail to load."),
+    LLAP_ALLOW_PERMANENT_FNS("hive.llap.allow.permanent.fns", true,
+        "Whether LLAP decider should allow permanent UDFs."),
     LLAP_EXECUTION_MODE("hive.llap.execution.mode", "none",
         new StringSet("auto", "none", "all", "map"),
         "Chooses whether query fragments will run in container or in llap"),
@@ -2671,7 +2674,7 @@ public class HiveConf extends Configuration {
     LLAP_DAEMON_COMMUNICATOR_NUM_THREADS("hive.llap.daemon.communicator.num.threads", 10,
       "Number of threads to use in LLAP task communicator in Tez AM.",
       "llap.daemon.communicator.num.threads"),
-    LLAP_DAEMON_ALLOW_PERMANENT_FNS("hive.llap.daemon.allow.permanent.fns", false,
+    LLAP_DAEMON_DOWNLOAD_PERMANENT_FNS("hive.llap.daemon.download.permanent.fns", false,
         "Whether LLAP daemon should localize the resources for permanent UDFs."),
     LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS(
       "hive.llap.task.scheduler.node.reenable.min.timeout.ms", "200ms",

http://git-wip-us.apache.org/repos/asf/hive/blob/2173e1d9/llap-server/bin/runLlapDaemon.sh
----------------------------------------------------------------------
diff --git a/llap-server/bin/runLlapDaemon.sh b/llap-server/bin/runLlapDaemon.sh
index 231bb6e..d9bfd8f 100755
--- a/llap-server/bin/runLlapDaemon.sh
+++ b/llap-server/bin/runLlapDaemon.sh
@@ -82,7 +82,7 @@ if [ ! -n "${LLAP_DAEMON_LOG_LEVEL}" ]; then
   LLAP_DAEMON_LOG_LEVEL=${LOG_LEVEL_DEFAULT}
 fi
 
-CLASSPATH=${LLAP_DAEMON_CONF_DIR}:${LLAP_DAEMON_HOME}/lib/*:${LLAP_DAEMON_HOME}/lib/tez/*:`${HADOOP_PREFIX}/bin/hadoop classpath`:.
+CLASSPATH=${LLAP_DAEMON_CONF_DIR}:${LLAP_DAEMON_HOME}/lib/*:${LLAP_DAEMON_HOME}/lib/tez/*:`${HADOOP_PREFIX}/bin/hadoop classpath`:${LLAP_DAEMON_HOME}/lib/udfs/*:.
 
 if [ -n "LLAP_DAEMON_USER_CLASSPATH" ]; then
   CLASSPATH=${CLASSPATH}:${LLAP_DAEMON_USER_CLASSPATH}

http://git-wip-us.apache.org/repos/asf/hive/blob/2173e1d9/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index 508ce27..1f3b930 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -20,14 +20,25 @@ package org.apache.hadoop.hive.llap.cli;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Collection;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
 import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon;
+import org.apache.hadoop.hive.llap.daemon.impl.StaticPermanentFunctionChecker;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -43,9 +54,18 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.cli.LlapOptionsProcessor.LlapOptions;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ResourceUri;
+import org.apache.hadoop.hive.ql.exec.FunctionTask;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.ResourceType;
+import org.apache.hadoop.hive.ql.util.ResourceDownloader;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
@@ -256,6 +276,7 @@ public class LlapServiceDriver {
 
     Path libDir = new Path(tmpDir, "lib");
     Path tezDir = new Path(libDir, "tez");
+    Path udfDir = new Path(libDir, "udfs");
 
     String tezLibs = conf.get(TezConfiguration.TEZ_LIB_URIS);
     if (tezLibs == null) {
@@ -329,6 +350,15 @@ public class LlapServiceDriver {
       }
     }
 
+    // UDFs
+    final Set<String> allowedUdfs;
+    
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOW_PERMANENT_FNS)) {
+      allowedUdfs = downloadPermanentFunctions(conf, udfDir);
+    } else {
+      allowedUdfs = Collections.emptySet();
+    }
+
     String java_home;
     if (options.getJavaPath() == null || options.getJavaPath().isEmpty()) {
       java_home = System.getenv("JAVA_HOME");
@@ -369,6 +399,14 @@ public class LlapServiceDriver {
     IOUtils.copyBytes(loggerContent,
         lfs.create(new Path(confPath, "llap-daemon-log4j2.properties"), true), conf, true);
 
+    PrintWriter udfStream =
+        new PrintWriter(lfs.create(new Path(confPath, StaticPermanentFunctionChecker.PERMANENT_FUNCTIONS_LIST)));
+    for (String udfClass : allowedUdfs) {
+      udfStream.println(udfClass);
+    }
+    
+    udfStream.close();
+
     // extract configs for processing by the python fragments in Slider
     JSONObject configs = new JSONObject();
 
@@ -419,6 +457,39 @@ public class LlapServiceDriver {
     }
   }
 
+  private Set<String> downloadPermanentFunctions(Configuration conf, Path udfDir) throws HiveException,
+      URISyntaxException, IOException {
+    Map<String,String> udfs = new HashMap<String, String>();
+    Hive hive = Hive.get(false);
+    ResourceDownloader resourceDownloader =
+        new ResourceDownloader(conf, udfDir.toUri().normalize().getPath());
+    List<Function> fns = hive.getAllFunctions();
+    Set<URI> srcUris = new HashSet<>();
+    for (Function fn : fns) {
+      String fqfn = fn.getDbName() + "." + fn.getFunctionName();
+      if (udfs.containsKey(fn.getClassName())) {
+        LOG.warn("Duplicate function names found for " + fn.getClassName() + " with " + fqfn
+            + " and " + udfs.get(fn.getClassName()));
+      }
+      udfs.put(fn.getClassName(), fqfn);
+      List<ResourceUri> resources = fn.getResourceUris();
+      if (resources == null || resources.isEmpty()) {
+        LOG.warn("Missing resources for " + fqfn);
+        continue;
+      }
+      for (ResourceUri resource : resources) {
+        srcUris.add(ResourceDownloader.createURI(resource.getUri()));
+      }
+    }
+    for (URI srcUri : srcUris) {
+      List<URI> localUris = resourceDownloader.downloadExternal(srcUri, null, false);
+      for(URI dst : localUris) {
+        LOG.warn("Downloaded " + dst + " from " + srcUri);
+      }
+    }
+    return udfs.keySet();
+  }
+
   private void localizeJarForClass(FileSystem lfs, Path libDir, String className, boolean doThrow)
       throws IOException {
     String jarPath = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/2173e1d9/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 165830c..ff9ce59 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.UDF;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.UdfWhitelistChecker;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ExitUtil;
@@ -168,7 +169,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
 
     // Initialize the function localizer.
     ClassLoader executorClassLoader = null;
-    if (HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_DAEMON_ALLOW_PERMANENT_FNS)) {
+    if (HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_DAEMON_DOWNLOAD_PERMANENT_FNS)) {
       this.fnLocalizer = new FunctionLocalizer(daemonConf, localDirs[0]);
       executorClassLoader = fnLocalizer.getClassLoader();
       // Set up the hook that will disallow creating non-whitelisted UDFs anywhere in the plan.
@@ -177,6 +178,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
       SerializationUtilities.setGlobalHook(new LlapGlobalUdfChecker(fnLocalizer));
     } else {
       this.fnLocalizer = null;
+      SerializationUtilities.setGlobalHook(new LlapGlobalUdfChecker(new StaticPermanentFunctionChecker(daemonConf)));
       executorClassLoader = Thread.currentThread().getContextClassLoader();
     }
 
@@ -449,9 +451,9 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
    * us into GenericUDFBridge-s, to check with the whitelist before instantiating a UDF.
    */
   private static final class LlapGlobalUdfChecker extends SerializationUtilities.Hook {
-    private FunctionLocalizer fnLocalizer;
-    public LlapGlobalUdfChecker(FunctionLocalizer fnLocalizer) {
-      this.fnLocalizer = fnLocalizer;
+    private UdfWhitelistChecker fnCheckerImpl;
+    public LlapGlobalUdfChecker(UdfWhitelistChecker fnCheckerImpl) {
+      this.fnCheckerImpl = fnCheckerImpl;
     }
 
     @Override
@@ -460,7 +462,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
       // 2) Ignore GenericUDFBridge, it's checked separately in LlapUdfBridgeChecker.
       if (GenericUDFBridge.class == type) return true; // Run post-hook.
       if (!(GenericUDF.class.isAssignableFrom(type) || UDF.class.isAssignableFrom(type))
-          || fnLocalizer.isUdfAllowed(type)) return false;
+          || fnCheckerImpl.isUdfAllowed(type)) return false;
       throw new SecurityException("UDF " + type.getCanonicalName() + " is not allowed");
     }
 
@@ -469,7 +471,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
       if (o == null) return o;
       Class<?> type = o.getClass();
       if (GenericUDFBridge.class == type)  {
-        ((GenericUDFBridge)o).setUdfChecker(fnLocalizer);
+        ((GenericUDFBridge)o).setUdfChecker(fnCheckerImpl);
       }
       // This won't usually be called otherwise.
       preRead(type);

http://git-wip-us.apache.org/repos/asf/hive/blob/2173e1d9/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StaticPermanentFunctionChecker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StaticPermanentFunctionChecker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StaticPermanentFunctionChecker.java
new file mode 100644
index 0000000..15968fa
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StaticPermanentFunctionChecker.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.UdfWhitelistChecker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sun.jdi.InvocationException;
+
+public class StaticPermanentFunctionChecker implements UdfWhitelistChecker {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticPermanentFunctionChecker.class);
+
+  public static final String PERMANENT_FUNCTIONS_LIST = "llap-udfs.lst";
+
+  private final IdentityHashMap<Class<?>, Boolean> allowedUdfClasses = new IdentityHashMap<>();
+  
+  public StaticPermanentFunctionChecker(Configuration conf) {
+    URL logger = conf.getResource(PERMANENT_FUNCTIONS_LIST);
+    if (logger == null) {
+      LOG.warn("Could not find UDF whitelist in configuration: " + PERMANENT_FUNCTIONS_LIST);
+      return;
+    }
+    try {
+      BufferedReader r = new BufferedReader(new InputStreamReader(logger.openStream()));
+      String klassName = r.readLine();
+      while (klassName != null) {
+        try {
+          Class<?> clazz = Class.forName(klassName.trim(), false, this.getClass().getClassLoader());
+          allowedUdfClasses.put(clazz, true);
+          // make a list before opening the RPC attack surface
+        } catch (ClassNotFoundException ie) {
+          // note: explicit format to use Throwable instead of var-args
+          LOG.warn("Could not load class " + klassName + " declared in UDF whitelist", ie);
+        }
+        klassName = r.readLine();
+      }
+    } catch (IOException ioe) {
+      LOG.warn("Could not read UDF whitelist: " + PERMANENT_FUNCTIONS_LIST, ioe);
+    }
+  }
+
+  @Override
+  public boolean isUdfAllowed(Class<?> clazz) {
+      return FunctionRegistry.isBuiltInFuncClass(clazz) || allowedUdfClasses.containsKey(clazz);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2173e1d9/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java
index 194828f..737d9c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java
@@ -107,7 +107,7 @@ public class LlapDecider implements PhysicalPlanResolver {
     public LlapDecisionDispatcher(PhysicalContext pctx, LlapMode mode) {
       conf = pctx.getConf();
       doSkipUdfCheck = HiveConf.getBoolVar(conf, ConfVars.LLAP_SKIP_COMPILE_UDF_CHECK);
-      arePermanentFnsAllowed = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_ALLOW_PERMANENT_FNS);
+      arePermanentFnsAllowed = HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOW_PERMANENT_FNS);
       // Don't user uber in "all" mode - everything can go into LLAP, which is better than uber.
       shouldUber = HiveConf.getBoolVar(conf, ConfVars.LLAP_AUTO_ALLOW_UBER) && (mode != all);
     }