You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2022/02/03 21:44:06 UTC

[impala] 02/03: IMPALA-10997: Refactor Java Hive UDF code.

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4734a681f3780a947dfe998f78f8b358e2f422ca
Author: Steve Carlin <sc...@cloudera.com>
AuthorDate: Thu Nov 11 11:42:41 2021 -0800

    IMPALA-10997: Refactor Java Hive UDF code.
    
    In its current form, Impala supports Java UDFs that are derived from
    the UDF.class.
    
    The UDF.class is legacy code and Hive now supports implementation off
    of the GenericUDF.class.
    
    This rewrite will allow for easier extension to support GenericUDFs.
    
    Among added classes:
    
    UdfExecutor: The entry point class which is directly accessed by the
    backend. This is a wrapper class to the UDF class that will handle
    the evaluation of rows.
    
    HiveUdfExecutor: Abstract base class that contains code that is common
    to the legacy UDF.class and the GenericUDF.class when it is eventually
    created.
    
    HiveUdfExecutorLegacy: Implementation of the code that is UDF.class
    specific.
    
    HiveUdfLoader: Class responsible for using reflection to instantiate
    the UDF class
    
    HiveJavaFunction: Interface for retrieving objects pertaining to the
    UDF function class.
    
    HiveLegacyJavaFunction: Class representing the metadata for the legacy
    UDF class.
    
    Also added some functionality which captures the error when a user
    attempts to create a function and the function doesn't exist. The
    unit test checking this is the UDFRound function which no longer
    exists in hive-exec.jar so it is now in a load-java-udfs-fail.test
    test file.
    
    Change-Id: Idc9572e15fbed1876412159b99dddd3fb4d37174
    Reviewed-on: http://gerrit.cloudera.org:8080/18020
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Csaba Ringhofer <cs...@cloudera.com>
---
 .../org/apache/impala/analysis/CreateUdfStmt.java  |   2 +-
 .../impala/catalog/CatalogServiceCatalog.java      |   9 +-
 .../org/apache/impala/catalog/ScalarFunction.java  |  53 --
 .../impala/hive/executor/HiveJavaFunction.java     |  64 +++
 .../hive/executor/HiveJavaFunctionFactory.java     |  41 ++
 .../hive/executor/HiveJavaFunctionFactoryImpl.java | 110 ++++
 .../hive/executor/HiveLegacyJavaFunction.java      | 274 +++++++++
 .../impala/hive/executor/HiveUdfExecutor.java      | 319 +++++++++++
 .../hive/executor/HiveUdfExecutorLegacy.java       | 131 +++++
 .../apache/impala/hive/executor/HiveUdfLoader.java | 166 ++++++
 .../impala/hive/executor/JavaUdfDataType.java      | 162 ++++++
 .../apache/impala/hive/executor/UdfExecutor.java   | 625 ++-------------------
 .../apache/impala/service/CatalogOpExecutor.java   |  26 +-
 .../java/org/apache/impala/service/JniCatalog.java |   4 +-
 .../java/org/apache/impala/util/FunctionUtils.java | 137 +----
 .../events/MetastoreEventsProcessorTest.java       |  10 +-
 .../hive/executor/HiveLegacyJavaFunctionTest.java  | 221 ++++++++
 .../hive/executor/TestHiveJavaFunctionFactory.java |  55 ++
 .../impala/hive/executor/UdfExecutorTest.java      |  12 +-
 .../impala/testutil/CatalogServiceTestCatalog.java |   7 +-
 .../impala/testutil/PlannerTestCaseLoader.java     |   4 +-
 .../queries/QueryTest/load-java-udfs-fail.test     |   9 +
 .../queries/QueryTest/load-java-udfs.test          |  16 -
 tests/query_test/test_udfs.py                      |  11 +-
 24 files changed, 1652 insertions(+), 816 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateUdfStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateUdfStmt.java
index a488ed0..2f96f0c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateUdfStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateUdfStmt.java
@@ -25,7 +25,7 @@ import org.apache.impala.catalog.PrimitiveType;
 import org.apache.impala.catalog.ScalarFunction;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
-import org.apache.impala.hive.executor.UdfExecutor.JavaUdfDataType;
+import org.apache.impala.hive.executor.JavaUdfDataType;
 import org.apache.impala.thrift.TFunctionBinaryType;
 import org.apache.impala.thrift.TSymbolType;
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index e4e4de1..83ad29b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -78,6 +78,8 @@ import org.apache.impala.common.Reference;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.hive.common.MutableValidWriteIdList;
+import org.apache.impala.hive.executor.HiveJavaFunction;
+import org.apache.impala.hive.executor.HiveJavaFunctionFactoryImpl;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.CatalogLookupStatus;
@@ -1749,9 +1751,10 @@ public class CatalogServiceCatalog extends Catalog {
     LOG.info("Loading Java functions for database: " + db.getName());
     for (org.apache.hadoop.hive.metastore.api.Function function: functions) {
       try {
-        List<Function> fns = FunctionUtils.extractFunctions(db.getName(), function,
-            localLibraryPath_);
-        for (Function fn: fns) {
+        HiveJavaFunctionFactoryImpl factory = new HiveJavaFunctionFactoryImpl();
+        HiveJavaFunction javaFunction =
+            factory.create(localLibraryPath_, function);
+        for (Function fn: javaFunction.extract()) {
           db.addFunction(fn);
           fn.setCatalogVersion(incrementAndGetCatalogVersion());
         }
diff --git a/fe/src/main/java/org/apache/impala/catalog/ScalarFunction.java b/fe/src/main/java/org/apache/impala/catalog/ScalarFunction.java
index 2f8d5ed..62b560f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ScalarFunction.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ScalarFunction.java
@@ -20,14 +20,9 @@ package org.apache.impala.catalog;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.hive.metastore.api.FunctionType;
-import org.apache.hadoop.hive.metastore.api.PrincipalType;
-import org.apache.hadoop.hive.metastore.api.ResourceType;
-import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.impala.analysis.FunctionName;
 import org.apache.impala.analysis.HdfsUri;
 import org.apache.impala.common.AnalysisException;
-import org.apache.impala.hive.executor.UdfExecutor.JavaUdfDataType;
 import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TFunctionBinaryType;
 import org.apache.impala.thrift.TScalarFunction;
@@ -111,54 +106,6 @@ public class ScalarFunction extends Function {
   }
 
   /**
-   * Creates a Function object based on following inputs.
-   * @param dbName Name of fn's database
-   * @param fnName Name of the function
-   * @param fnClass Function symbol name
-   * @param fnArgs List of Class objects corresponding to the args of evaluate method
-   * @param fnRetType Class corresponding to the return type of the evaluate method
-   * @param hdfsUri URI of the jar holding the udf class.
-   * @return Function object corresponding to the hive udf if the parameters are
-   *         compatible, null otherwise.
-   */
-  public static Function fromHiveFunction(String dbName, String fnName, String fnClass,
-      Class<?>[] fnArgs, Class<?> fnRetType, String hdfsUri) {
-    // Check if the return type and the method arguments are supported.
-    // Currently we only support certain primitive types.
-    JavaUdfDataType javaRetType = JavaUdfDataType.getType(fnRetType);
-    if (javaRetType == JavaUdfDataType.INVALID_TYPE) return null;
-    List<Type> fnArgsList = new ArrayList<>();
-    for (Class<?> argClass: fnArgs) {
-      JavaUdfDataType javaUdfType = JavaUdfDataType.getType(argClass);
-      if (javaUdfType == JavaUdfDataType.INVALID_TYPE) return null;
-      fnArgsList.add(new ScalarType(
-          PrimitiveType.fromThrift(javaUdfType.getPrimitiveType())));
-    }
-    ScalarType retType = new ScalarType(
-        PrimitiveType.fromThrift(javaRetType.getPrimitiveType()));
-    ScalarFunction fn = new ScalarFunction(new FunctionName(dbName, fnName), fnArgsList,
-        retType, new HdfsUri(hdfsUri), fnClass, null, null);
-    // We do not support varargs for Java UDFs, and neither does Hive.
-    fn.setHasVarArgs(false);
-    fn.setBinaryType(TFunctionBinaryType.JAVA);
-    fn.setIsPersistent(true);
-    return fn;
-  }
-
-  /**
-   * Creates a Hive function object from 'this'. Returns null if 'this' is not
-   * a Java UDF.
-   */
-  public org.apache.hadoop.hive.metastore.api.Function toHiveFunction() {
-    if (getBinaryType() != TFunctionBinaryType.JAVA) return null;
-    List<ResourceUri> resources = Lists.newArrayList(new ResourceUri(ResourceType.JAR,
-        getLocation().toString()));
-    return new org.apache.hadoop.hive.metastore.api.Function(functionName(), dbName(),
-        symbolName_, "", PrincipalType.USER, (int) (System.currentTimeMillis() / 1000),
-        FunctionType.JAVA, resources);
-  }
-
-  /**
    * Creates a builtin scalar operator function. This is a helper that wraps a few steps
    * into one call.
    * TODO: this needs to be kept in sync with what generates the be operator
diff --git a/fe/src/main/java/org/apache/impala/hive/executor/HiveJavaFunction.java b/fe/src/main/java/org/apache/impala/hive/executor/HiveJavaFunction.java
new file mode 100644
index 0000000..f8946da
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/hive/executor/HiveJavaFunction.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.hive.executor;
+
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.ResourceType;
+import org.apache.hadoop.hive.metastore.api.ResourceUri;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.ScalarFunction;
+import org.apache.impala.thrift.TFunctionBinaryType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Interface to represent a UDF defined by Hive. Hive has different
+ * UDFClassTypes which can be found in their UDFClassType definition.
+ */
+public interface HiveJavaFunction {
+
+  /**
+   * Extract all the supported ScalarFunction objects from the Hive Java
+   * function.
+   */
+  public List<ScalarFunction> extract() throws CatalogException;
+
+  /**
+   * Get the Hive "Function" object declared by the Hive metastore API.
+   */
+  public Function getHiveFunction();
+
+  /**
+   * Helper function to convert an Impala function object into a Hive metastore
+   * API function object.
+   */
+  public static Function toHiveFunction(ScalarFunction scalarFn) {
+    Preconditions.checkState(scalarFn.getBinaryType() == TFunctionBinaryType.JAVA);
+    List<ResourceUri> resources = Lists.newArrayList(new ResourceUri(ResourceType.JAR,
+        scalarFn.getLocation().toString()));
+    return new Function(scalarFn.functionName(), scalarFn.dbName(),
+        scalarFn.getSymbolName(), "", PrincipalType.USER,
+        (int) (System.currentTimeMillis() / 1000),
+        FunctionType.JAVA, resources);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/hive/executor/HiveJavaFunctionFactory.java b/fe/src/main/java/org/apache/impala/hive/executor/HiveJavaFunctionFactory.java
new file mode 100644
index 0000000..159a732
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/hive/executor/HiveJavaFunctionFactory.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.hive.executor;
+
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.ScalarFunction;
+import org.apache.impala.catalog.Type;
+
+/**
+ * Interface of Factory class to create the HiveJavaFunction instance.
+ */
+public interface HiveJavaFunctionFactory {
+  /**
+   * The local path contains a directory on the local file system to which the
+   * jar file on hdfs can be copied.
+   */
+  public HiveJavaFunction create(String localLibPath, Function hiveFn,
+      Type retType, Type[] paramTypes) throws CatalogException;
+
+  public HiveJavaFunction create(String localLibPath,
+      ScalarFunction fn) throws CatalogException;
+
+  public HiveJavaFunction create(String localLibPath, Function hiveFn)
+      throws CatalogException;
+}
diff --git a/fe/src/main/java/org/apache/impala/hive/executor/HiveJavaFunctionFactoryImpl.java b/fe/src/main/java/org/apache/impala/hive/executor/HiveJavaFunctionFactoryImpl.java
new file mode 100644
index 0000000..c90b09b
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/hive/executor/HiveJavaFunctionFactoryImpl.java
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.hive.executor;
+
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.ResourceType;
+import org.apache.hadoop.hive.metastore.api.ResourceUri;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.ScalarFunction;
+import org.apache.impala.catalog.Type;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Factory class to create the HiveJavaFunction instance.
+ */
+public class HiveJavaFunctionFactoryImpl implements HiveJavaFunctionFactory {
+  /**
+   * The local path contains a directory on the local file system to which the
+   * jar file on hdfs can be copied.
+   */
+  public HiveJavaFunction create(String localLibPath, Function hiveFn,
+      Type retType, Type[] paramTypes) throws CatalogException {
+    checkValidFunction(hiveFn);
+    String jarUri = hiveFn.getResourceUris().get(0).getUri();
+    String fnName = hiveFn.getDbName() + "." + hiveFn.getFunctionName();
+    HiveUdfLoader javaClass = HiveUdfLoader.createWithLocalPath(localLibPath, hiveFn);
+    switch (javaClass.getUDFClassType()) {
+      case UDF:
+        return new HiveLegacyJavaFunction(javaClass.getUDFClass(), hiveFn, retType,
+            paramTypes);
+      default:
+        throw new CatalogException("Function " + fnName + ": The class "
+            + jarUri + " does not derive "
+            + "from a known supported Hive UDF class (UDF).");
+    }
+  }
+
+  public HiveJavaFunction create(String localLibPath,
+      ScalarFunction fn) throws CatalogException {
+    return create(localLibPath, HiveJavaFunction.toHiveFunction((ScalarFunction) fn),
+        fn.getReturnType(), fn.getArgs());
+  }
+
+  public HiveJavaFunction create(String localLibPath, Function hiveFn)
+      throws CatalogException {
+    return create(localLibPath, hiveFn, null, null);
+  }
+
+  /**
+   * Checks if the Hive function 'fn' is Impala compatible. A function is Impala
+   * compatible iff
+   *
+   * 1. The function is JAVA based,
+   * 2. Has exactly one binary resource associated (We don't support loading
+   *    dependencies yet) and
+   * 3. The binary is of type JAR.
+   *
+   * Returns true if compatible and false otherwise. In case of incompatible
+   * functions 'incompatMsg' has the reason for the incompatibility.
+   * */
+   private void checkValidFunction(Function fn) throws CatalogException {
+    String errorPrefix = "Skipping load of incompatible function: " +
+        fn.getFunctionName() + ". ";
+    if (fn.getFunctionType() != FunctionType.JAVA) {
+      throw new CatalogException(errorPrefix + "Function type: " +
+          fn.getFunctionType().name() + " is not supported. Only " +
+          FunctionType.JAVA.name() + " functions are supported.");
+    }
+    if (fn.getResourceUrisSize() == 0) {
+      throw new CatalogException(errorPrefix + "No executable binary resource "
+          + "(like a JAR file) is associated with this function. To fix this, recreate "
+          + "the function by specifying a 'location' in the function create statement.");
+    }
+    if (fn.getResourceUrisSize() != 1) {
+      List<String> resourceUris = new ArrayList<>();
+      for (ResourceUri resource: fn.getResourceUris()) {
+        resourceUris.add(resource.getUri());
+      }
+      throw new CatalogException(errorPrefix + "Impala does not support multiple "
+          + "Jars for dependencies. (" + Joiner.on(",").join(resourceUris) + ") ");
+    }
+    if (fn.getResourceUris().get(0).getResourceType() != ResourceType.JAR) {
+      throw new CatalogException(errorPrefix + "Function binary type: " +
+        fn.getResourceUris().get(0).getResourceType().name()
+        + " is not supported. Only " + ResourceType.JAR.name()
+        + " type is supported.");
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/hive/executor/HiveLegacyJavaFunction.java b/fe/src/main/java/org/apache/impala/hive/executor/HiveLegacyJavaFunction.java
new file mode 100644
index 0000000..6b5773b
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/hive/executor/HiveLegacyJavaFunction.java
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.hive.executor;
+
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.impala.analysis.FunctionName;
+import org.apache.impala.analysis.HdfsUri;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.PrimitiveType;
+import org.apache.impala.catalog.ScalarFunction;
+import org.apache.impala.catalog.ScalarType;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.thrift.TFunctionBinaryType;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Joiner;
+
+import org.apache.log4j.Logger;
+
+/**
+ * HiveLegacyJavaFunction generates the instance of the UDF object given
+ * a className. If a return type is passed in, it will check that the types are valid
+ * for the UDF. The "extract" method can be used off of the UDF class to find all
+ * methods named "evaluate()" that use supported param types and return types using
+ * Java reflection.
+ */
+public class HiveLegacyJavaFunction implements HiveJavaFunction {
+  private static final Logger LOG = Logger.getLogger(HiveLegacyJavaFunction.class);
+
+  // By convention, the function in the class must be called evaluate()
+  private static final String UDF_FUNCTION_NAME = "evaluate";
+
+  private final Function hiveFn_;
+
+  private final UDF UDF_;
+
+  private final Type retType_;
+
+  private final Type[] parameterTypes_;
+
+  // Method of the function in the UDF class. If no retType is supplied, this will be
+  // null (and presumably be used for extraction of methods).
+  private final Method method_;
+
+  public HiveLegacyJavaFunction(Class<?> udfClass, Function hiveFn,
+      Type retType, Type[] parameterTypes) throws CatalogException {
+    try {
+      hiveFn_ = hiveFn;
+      retType_ = retType;
+      parameterTypes_ = parameterTypes;
+      UDF_ = instantiateUDFInstance(udfClass);
+      method_ = (retType != null && retType != ScalarType.INVALID)
+          ? getMatchingMethod(udfClass) : null;
+    } catch (CatalogException e) {
+      String errorMsg = "Error retrieving class " + udfClass + ": " + e.getMessage();
+      throw new CatalogException(errorMsg, e);
+    }
+  }
+
+  public HiveLegacyJavaFunction(Class<?> udfClass,
+      Type retType, Type[] parameterTypes) throws CatalogException {
+    this(udfClass, null, retType, parameterTypes);
+  }
+
+  @Override
+  public Function getHiveFunction() {
+    return hiveFn_;
+  }
+
+  public Method getMethod() {
+    return method_;
+  }
+
+  public Class<?> getRetType() {
+    return method_.getReturnType();
+  }
+
+  public Class<?>[] getParameterTypes() {
+    return method_.getParameterTypes();
+  }
+
+  public UDF getUDFInstance() {
+    return UDF_;
+  }
+
+  /**
+   * Instantiate and return the class given by udfClass.
+   */
+  private UDF instantiateUDFInstance(Class<?> udfClass)
+      throws CatalogException {
+    try {
+      Constructor<?> ctor = udfClass.getConstructor();
+      return (UDF) ctor.newInstance();
+    } catch (NoSuchMethodException e) {
+      throw new CatalogException(
+          "Unable to find constructor with no arguments.", e);
+    } catch (IllegalArgumentException e) {
+      throw new CatalogException(
+          "Unable to call UDF constructor with no arguments.", e);
+    } catch (InstantiationException e) {
+      throw new CatalogException("Unable to call create UDF instance.", e);
+    } catch (IllegalAccessException e) {
+      throw new CatalogException("Unable to call create UDF instance.", e);
+    } catch (InvocationTargetException e) {
+      throw new CatalogException("Unable to call create UDF instance.", e);
+    } catch (ClassCastException e) {
+      throw new CatalogException(
+          "Unable to cast to UDF instance.", e);
+    }
+  }
+  /**
+   * Returns a list of Impala Functions, one per compatible "evaluate" method in the UDF
+   * class referred to by the given Java function. This method copies the UDF Jar
+   * referenced in the function definition to a temporary file in localLibraryPath_ and
+   * loads it into the jvm. Then we scan all the methods in the class using reflection and
+   * extract those methods and create corresponding Impala functions. Currently Impala
+   * supports only "JAR" files for symbols and also a single Jar containing all the
+   * dependent classes rather than a set of Jar files.
+   */
+  @Override
+  public List<ScalarFunction> extract() throws CatalogException {
+    Set<String> addedSignatures = new HashSet<>();
+    List<ScalarFunction> result = new ArrayList<>();
+    String jarUri = hiveFn_.getResourceUris().get(0).getUri();
+    // Load each method in the UDF class and create the corresponding Impala Function
+    // object.
+    for (Method m: UDF_.getClass().getMethods()) {
+      if (m.getName().equals(UDF_FUNCTION_NAME)) {
+        ScalarFunction fn = fromHiveFunction(hiveFn_.getDbName(),
+            hiveFn_.getFunctionName(), hiveFn_.getClassName(),
+            m.getParameterTypes(), m.getReturnType(), jarUri);
+        if (fn != null) {
+          if (!addedSignatures.contains(fn.signatureString())) {
+            result.add(fn);
+            addedSignatures.add(fn.signatureString());
+          }
+        } else {
+          LOG.warn("Ignoring incompatible method: " + m.toString() + " during load of "
+              + "Hive UDF:" + hiveFn_.getFunctionName() + " from " + UDF_.getClass());
+        }
+      }
+    }
+    if (result.isEmpty()) {
+      throw new CatalogException("No compatible function signatures found.");
+    }
+    return result;
+  }
+
+  private Method getMatchingMethod(Class<?> udfClass) throws CatalogException {
+    for (Method m : udfClass.getMethods()) {
+      if (methodMatches(m, retType_, parameterTypes_)) {
+        return m;
+      }
+    }
+    throw new CatalogException(
+        getExceptionString(udfClass.getMethods(), udfClass.toString(),
+        parameterTypes_));
+  }
+
+  private static boolean methodMatches(Method m, Type retType,
+      Type[] parameterTypes) {
+    if (!m.getName().equals(UDF_FUNCTION_NAME)) {
+      return false;
+    }
+
+    // Check if the evaluate method return type is compatible with the return type from
+    // the function definition. This happens when both of them map to the same primitive
+    // type.
+    JavaUdfDataType javaRetType = JavaUdfDataType.getType(m.getReturnType());
+    if (retType.getPrimitiveType().toThrift() != javaRetType.getPrimitiveType()) {
+      return false;
+    }
+
+    // Try to match the arguments
+    if (m.getParameterTypes().length != parameterTypes.length) {
+      return false;
+    }
+
+    for (int i = 0; i < m.getParameterTypes().length; ++i) {
+      JavaUdfDataType javaArgType =
+          JavaUdfDataType.getType(m.getParameterTypes()[i]);
+      if (javaArgType.getPrimitiveType() !=
+          parameterTypes[i].getPrimitiveType().toThrift()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public static String getExceptionString(Method[] methods,
+      String className, Type[] parameterTypes) {
+    List<String> signatures = new ArrayList<>();
+    for (Method m : methods) {
+      // only include "evaluate" methods
+      if (m.getName().equals(UDF_FUNCTION_NAME)) {
+        signatures.add(m.toGenericString());
+      }
+    }
+    StringBuilder sb = new StringBuilder();
+    sb.append("Unable to find evaluate function with the correct signature: ")
+      .append(className + ".evaluate(")
+      .append(Joiner.on(", ").join(parameterTypes))
+      .append(")\n")
+      .append("UDF contains: \n    ")
+      .append(Joiner.on("\n    ").join(signatures));
+    return sb.toString();
+  }
+
+  /**
+   * Creates a Function object based on following inputs.
+   * @param dbName Name of fn's database
+   * @param fnName Name of the function
+   * @param fnClass Function symbol name
+   * @param fnArgs List of Class objects corresponding to the args of evaluate method
+   * @param fnRetType Class corresponding to the return type of the evaluate method
+   * @param hdfsUri URI of the jar holding the udf class.
+   * @return Function object corresponding to the hive udf if the parameters are
+   *         compatible, null otherwise.
+   */
+  private ScalarFunction fromHiveFunction(String dbName, String fnName, String fnClass,
+      Class<?>[] fnArgs, Class<?> fnRetType, String hdfsUri) {
+    // Check if the return type and the method arguments are supported.
+    // Currently we only support certain primitive types.
+    JavaUdfDataType javaRetType = JavaUdfDataType.getType(fnRetType);
+    if (javaRetType == JavaUdfDataType.INVALID_TYPE) {
+      LOG.debug("Processing " + fnClass + ", return type " + fnRetType +
+          " not supported.");
+      return null;
+    }
+    List<Type> fnArgsList = new ArrayList<>();
+    for (Class<?> argClass: fnArgs) {
+      JavaUdfDataType javaUdfType = JavaUdfDataType.getType(argClass);
+      if (javaUdfType == JavaUdfDataType.INVALID_TYPE) {
+        LOG.debug("Processing " + fnClass + ", param type " + argClass +
+            " not supported.");
+        return null;
+      }
+      fnArgsList.add(ScalarType.createType(
+          PrimitiveType.fromThrift(javaUdfType.getPrimitiveType())));
+    }
+    ScalarType retType = ScalarType.createType(
+        PrimitiveType.fromThrift(javaRetType.getPrimitiveType()));
+    ScalarFunction fn = new ScalarFunction(new FunctionName(dbName, fnName), fnArgsList,
+        retType, new HdfsUri(hdfsUri), fnClass, null, null);
+    // We do not support varargs for Java UDFs, and neither does Hive.
+    fn.setHasVarArgs(false);
+    fn.setBinaryType(TFunctionBinaryType.JAVA);
+    fn.setIsPersistent(true);
+    return fn;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/hive/executor/HiveUdfExecutor.java b/fe/src/main/java/org/apache/impala/hive/executor/HiveUdfExecutor.java
new file mode 100644
index 0000000..31721f6
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/hive/executor/HiveUdfExecutor.java
@@ -0,0 +1,319 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.hive.executor;
+
+import sun.misc.Unsafe;
+
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.thrift.THiveUdfExecutorCtorParams;
+import org.apache.impala.util.UnsafeUtil;
+import org.apache.log4j.Logger;
+
+// Base Object to run hive UDFs. Hive has two ways in which UDFs are supported.
+// 1) Deriving off of the UDF class (legacy)
+// 2) Deriving off of the GenericUDF class.
+// The base class here supports portions that are common to both implementations.
+public abstract class HiveUdfExecutor {
+
+  private static final Logger LOG = Logger.getLogger(HiveUdfExecutor.class);
+
+  // Return and argument types of the function inferred from the udf method signature.
+  // The JavaUdfDataType enum maps it to corresponding primitive type.
+  private final JavaUdfDataType[] argTypes_;
+  private final JavaUdfDataType retType_;
+
+  // Input buffer from the backend. This is valid for the duration of an evaluate() call.
+  // These buffers are allocated in the BE.
+  private final long inputBufferPtr_;
+  private final long inputNullsPtr_;
+
+  // This is the byte offset in inputBufferPtr to the start of the input argument.
+  // e.g. *inputBufferPtr_[inputBufferOffsets[i]] is the ith input argument.
+  private final int[] inputBufferOffsets_;
+
+  // Output buffer to return non-string values. These buffers are allocated in the BE.
+  private final long outputBufferPtr_;
+  private final long outputNullPtr_;
+
+  // For StringValue return types, outputBufferPtr_ is the location of the 16-byte
+  // StringValue object. StringValue.ptr is set to outBufferStringPtr_. This buffer
+  // grows as necessary to fit the return string.
+  // This is allocated from the FE.
+  private long outBufferStringPtr_;
+
+  // Size of outBufferStringPtr_.
+  private int outBufferStringCapacity_;
+
+  // Preconstructed input objects for the UDF. This minimizes object creation overhead
+  // as these objects are reused across calls to evaluate().
+  private final Writable[] inputObjects_;
+
+  protected HiveUdfExecutor(
+      THiveUdfExecutorCtorParams request,
+      JavaUdfDataType retType, JavaUdfDataType[] argTypes) throws ImpalaRuntimeException {
+    retType_ = retType;
+    argTypes_ = argTypes;
+    inputBufferPtr_ = request.input_buffer_ptr;
+    inputNullsPtr_ = request.input_nulls_ptr;
+    outputBufferPtr_ = request.output_buffer_ptr;
+    outputNullPtr_ = request.output_null_ptr;
+    outBufferStringPtr_ = 0;
+    outBufferStringCapacity_ = 0;
+    inputBufferOffsets_ = new int[request.input_byte_offsets.size()];
+    for (int i = 0; i < request.input_byte_offsets.size(); ++i) {
+      inputBufferOffsets_[i] = request.input_byte_offsets.get(i).intValue();
+    }
+    inputObjects_ = new Writable[argTypes_.length];
+    allocateInputObjects();
+  }
+
+  /**
+   * Releases any resources allocated off the native heap and close the class
+   * loader we may have created.
+   */
+  public void close() {
+    UnsafeUtil.UNSAFE.freeMemory(outBufferStringPtr_);
+    outBufferStringPtr_ = 0;
+    outBufferStringCapacity_ = 0;
+    closeDerived();
+  }
+
+  /**
+   * Evaluate function called by the backend. The inputs to the UDF have
+   * been serialized to 'inputObjects_'
+   */
+  public long evaluate() throws ImpalaRuntimeException {
+    return storeUdfResult(evaluateDerived(argTypes_, inputNullsPtr_, inputObjects_));
+  }
+
+  /**
+   * Evalutes the UDF with 'args' as the input to the UDF. This is exposed
+   * for testing and not the version of evaluate() the backend uses.
+   */
+  public long evaluateForTesting(Object... args) throws ImpalaRuntimeException {
+    return storeUdfResult(evaluateDerived(argTypes_, inputNullsPtr_, args));
+  }
+
+  // Sets the result object 'obj' into the outputBufferPtr_
+  // Returns the 0L (null) if the return value is null, otherwise return
+  // outputBufferPtr_  which contains the long value of the pointer.
+  protected long storeUdfResult(Object obj) throws ImpalaRuntimeException {
+    if (obj == null) {
+      UnsafeUtil.UNSAFE.putByte(outputNullPtr_, (byte)1);
+      return 0L;
+    }
+
+    UnsafeUtil.UNSAFE.putByte(outputNullPtr_, (byte)0);
+    switch (retType_) {
+      case BOOLEAN_WRITABLE: {
+        BooleanWritable val = (BooleanWritable)obj;
+        UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, val.get() ? (byte)1 : 0);
+        return outputBufferPtr_;
+      }
+      case BOOLEAN: {
+        UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, (boolean)obj ? (byte)1 : 0);
+        return outputBufferPtr_;
+      }
+      case BYTE_WRITABLE: {
+        ByteWritable val = (ByteWritable)obj;
+        UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, val.get());
+        return outputBufferPtr_;
+      }
+      case TINYINT: {
+        UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, (byte)obj);
+        return outputBufferPtr_;
+      }
+      case SHORT_WRITABLE: {
+        ShortWritable val = (ShortWritable)obj;
+        UnsafeUtil.UNSAFE.putShort(outputBufferPtr_, val.get());
+        return outputBufferPtr_;
+      }
+      case SMALLINT: {
+        UnsafeUtil.UNSAFE.putShort(outputBufferPtr_, (short)obj);
+        return outputBufferPtr_;
+      }
+      case INT_WRITABLE: {
+        IntWritable val = (IntWritable)obj;
+        UnsafeUtil.UNSAFE.putInt(outputBufferPtr_, val.get());
+        return outputBufferPtr_;
+      }
+      case INT: {
+        UnsafeUtil.UNSAFE.putInt(outputBufferPtr_, (int)obj);
+        return outputBufferPtr_;
+      }
+      case LONG_WRITABLE: {
+        LongWritable val = (LongWritable)obj;
+        UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, val.get());
+        return outputBufferPtr_;
+      }
+      case BIGINT: {
+        UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, (long)obj);
+        return outputBufferPtr_;
+      }
+      case FLOAT_WRITABLE: {
+        FloatWritable val = (FloatWritable)obj;
+        UnsafeUtil.UNSAFE.putFloat(outputBufferPtr_, val.get());
+        return outputBufferPtr_;
+      }
+      case FLOAT: {
+        UnsafeUtil.UNSAFE.putFloat(outputBufferPtr_, (float)obj);
+        return outputBufferPtr_;
+      }
+      case DOUBLE_WRITABLE: {
+        DoubleWritable val = (DoubleWritable)obj;
+        UnsafeUtil.UNSAFE.putDouble(outputBufferPtr_, val.get());
+        return outputBufferPtr_;
+      }
+      case DOUBLE: {
+        UnsafeUtil.UNSAFE.putDouble(outputBufferPtr_, (double)obj);
+        return outputBufferPtr_;
+      }
+      case TEXT: {
+        copyBytesToOutputBuffer(((Text)obj).copyBytes());
+        return outputBufferPtr_;
+      }
+      case BYTE_ARRAY: {
+        copyBytesToOutputBuffer((byte[]) obj);
+        return outputBufferPtr_;
+      }
+      case BYTES_WRITABLE: {
+        copyBytesToOutputBuffer(((BytesWritable)obj).copyBytes());
+        return outputBufferPtr_;
+      }
+      case STRING: {
+        copyBytesToOutputBuffer(((String)obj).getBytes());
+        return outputBufferPtr_;
+      }
+      default:
+        throw new ImpalaRuntimeException("Unsupported return type: " + retType_);
+    }
+  }
+
+  protected int getNumParams() {
+    return inputObjects_.length;
+  }
+
+  protected Object getInputObject(int i) {
+    return inputObjects_[i];
+  }
+
+  private void copyBytesToOutputBuffer(byte[] bytes) {
+    if (bytes.length > outBufferStringCapacity_) {
+      outBufferStringPtr_ =
+          UnsafeUtil.UNSAFE.reallocateMemory(outBufferStringPtr_, bytes.length);
+      outBufferStringCapacity_ = bytes.length;
+      UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, outBufferStringPtr_);
+    }
+    UnsafeUtil.Copy(outBufferStringPtr_, bytes, 0, bytes.length);
+    UnsafeUtil.UNSAFE.putInt(
+        outputBufferPtr_ + ImpalaStringWritable.STRING_VALUE_LEN_OFFSET,
+        bytes.length);
+  }
+
+  // Preallocate the input objects that will be passed to the underlying UDF.
+  // These objects are allocated once and reused across calls to evaluate()
+  private void allocateInputObjects() throws ImpalaRuntimeException {
+    for (int i = 0; i < argTypes_.length; ++i) {
+      int offset = inputBufferOffsets_[i];
+      switch (argTypes_[i]) {
+        case BOOLEAN:
+        case BOOLEAN_WRITABLE:
+          inputObjects_[i] = new ImpalaBooleanWritable(inputBufferPtr_ + offset);
+          break;
+        case TINYINT:
+        case BYTE_WRITABLE:
+          inputObjects_[i] = new ImpalaTinyIntWritable(inputBufferPtr_ + offset);
+          break;
+        case SMALLINT:
+        case SHORT_WRITABLE:
+          inputObjects_[i] = new ImpalaSmallIntWritable(inputBufferPtr_ + offset);
+          break;
+        case INT:
+        case INT_WRITABLE:
+          inputObjects_[i] = new ImpalaIntWritable(inputBufferPtr_ + offset);
+          break;
+        case BIGINT:
+        case LONG_WRITABLE:
+          inputObjects_[i] = new ImpalaBigIntWritable(inputBufferPtr_ + offset);
+          break;
+        case FLOAT:
+        case FLOAT_WRITABLE:
+          inputObjects_[i] = new ImpalaFloatWritable(inputBufferPtr_ + offset);
+          break;
+        case DOUBLE:
+        case DOUBLE_WRITABLE:
+          inputObjects_[i] = new ImpalaDoubleWritable(inputBufferPtr_ + offset);
+          break;
+        case TEXT:
+          inputObjects_[i] = new ImpalaTextWritable(inputBufferPtr_ + offset);
+          break;
+        case BYTES_WRITABLE:
+          inputObjects_[i] = new ImpalaBytesWritable(inputBufferPtr_ + offset);
+          break;
+        case STRING:
+          // String can be mapped to any String-like Writable class.
+          inputObjects_[i] = new ImpalaBytesWritable(inputBufferPtr_ + offset);
+          break;
+        default:
+          throw new ImpalaRuntimeException("Unsupported argument type: " + argTypes_[i]);
+      }
+    }
+  }
+
+  public static Type getRetType(THiveUdfExecutorCtorParams request) {
+    return Type.fromThrift(request.fn.ret_type);
+  }
+
+  public static Type[] getParameterTypes(THiveUdfExecutorCtorParams request) {
+    Type[] parameterTypes = new Type[request.fn.arg_types.size()];
+    for (int i = 0; i < request.fn.arg_types.size(); ++i) {
+      parameterTypes[i] = Type.fromThrift(request.fn.arg_types.get(i));
+    }
+    return parameterTypes;
+  }
+
+  /**
+   * Abstract method allowing derived class to deinitialize what it needs to.
+   */
+  abstract protected void closeDerived();
+
+  /**
+   * Abstract method allowing derived class to evaluate the function.
+   */
+  abstract protected Object evaluateDerived(JavaUdfDataType[] argTypes,
+      long inputNullsPtr, Object[] inputObjectArgs) throws ImpalaRuntimeException;
+
+  /**
+   * Abstract method returning the Java reflection Method type of the 'evaluate' method.
+   */
+  abstract public Method getMethod();
+
+}
diff --git a/fe/src/main/java/org/apache/impala/hive/executor/HiveUdfExecutorLegacy.java b/fe/src/main/java/org/apache/impala/hive/executor/HiveUdfExecutorLegacy.java
new file mode 100644
index 0000000..0f9c593
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/hive/executor/HiveUdfExecutorLegacy.java
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.hive.executor;
+
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.thrift.THiveUdfExecutorCtorParams;
+import org.apache.impala.util.UnsafeUtil;
+
+import com.google.common.base.Preconditions;
+
+// Wrapper object to run hive UDFs. This class works with UdfCallExpr in the
+// backend to marshall data back and forth between the execution engine and
+// the java UDF class.
+// See the comments in be/src/exprs/hive-udf-call.h for more details.
+// TODO: should we cache loaded jars and classes?
+@SuppressWarnings("restriction")
+public class HiveUdfExecutorLegacy extends HiveUdfExecutor {
+  // TODO UDF is deprecated in Hive and newer implementation of built-in functions using
+  // GenericUDF interface, we should consider supporting GenericUDFs in the future
+  private UDF udf_;
+
+  // setup by init() and cleared by close()
+  private Method method_;
+
+  private final Object[] inputArgs_; // inputArgs_[i] is either inputObjects_[i] or null
+
+  /**
+   * Create a HiveUdfExecutorLegacy, using parameters from a serialized thrift object.
+   * Used by the backend.
+   */
+  public HiveUdfExecutorLegacy(THiveUdfExecutorCtorParams request,
+      HiveLegacyJavaFunction hiveJavaFn) throws ImpalaRuntimeException {
+    super(request, JavaUdfDataType.getType(hiveJavaFn.getRetType()),
+        JavaUdfDataType.getTypes(hiveJavaFn.getParameterTypes()));
+    udf_ = hiveJavaFn.getUDFInstance();
+    method_ = hiveJavaFn.getMethod();
+    inputArgs_ = new Object[getNumParams()];
+  }
+
+  /**
+   * Releases any resources allocated off the native heap and close the class
+   * loader we may have created.
+   */
+  @Override
+  public void closeDerived() {
+    method_ = null;
+  }
+
+  /**
+   * Evalutes the UDF with 'args' as the input to the UDF.
+   * Returns Object returned by UDF.
+   */
+  @Override
+  protected Object evaluateDerived(JavaUdfDataType[] argTypes,
+      long inputNullsPtr, Object... inputObjects)
+      throws ImpalaRuntimeException {
+    try {
+      for (int i = 0; i < argTypes.length; ++i) {
+        if (UnsafeUtil.UNSAFE.getByte(inputNullsPtr + i) == 0) {
+          switch (argTypes[i]) {
+            case BOOLEAN_WRITABLE:
+            case BYTE_WRITABLE:
+            case SHORT_WRITABLE:
+            case INT_WRITABLE:
+            case LONG_WRITABLE:
+            case FLOAT_WRITABLE:
+            case DOUBLE_WRITABLE:
+            case BYTE_ARRAY:
+            case BYTES_WRITABLE:
+            case TEXT:
+              inputArgs_[i] = inputObjects[i];
+              break;
+            case BOOLEAN:
+              inputArgs_[i] = ((ImpalaBooleanWritable)inputObjects[i]).get();
+              break;
+            case TINYINT:
+              inputArgs_[i] = ((ImpalaTinyIntWritable)inputObjects[i]).get();
+              break;
+            case SMALLINT:
+              inputArgs_[i] = ((ImpalaSmallIntWritable)inputObjects[i]).get();
+              break;
+            case INT:
+              inputArgs_[i] = ((ImpalaIntWritable)inputObjects[i]).get();
+              break;
+            case BIGINT:
+              inputArgs_[i] = ((ImpalaBigIntWritable)inputObjects[i]).get();
+              break;
+            case FLOAT:
+              inputArgs_[i] = ((ImpalaFloatWritable)inputObjects[i]).get();
+              break;
+            case DOUBLE:
+              inputArgs_[i] = ((ImpalaDoubleWritable)inputObjects[i]).get();
+              break;
+            case STRING:
+              Preconditions.checkState(inputObjects[i] instanceof ImpalaBytesWritable);
+              inputArgs_[i] =
+                  new String(((ImpalaBytesWritable)inputObjects[i]).getBytes());
+              break;
+          }
+        } else {
+          inputArgs_[i] = null;
+        }
+      }
+      return method_.invoke(udf_, inputArgs_);
+    } catch (Exception e) {
+      e.printStackTrace(System.err);
+      throw new ImpalaRuntimeException("UDF::evaluate() ran into a problem.", e);
+    }
+  }
+
+  @Override
+  public Method getMethod() { return method_; }
+}
diff --git a/fe/src/main/java/org/apache/impala/hive/executor/HiveUdfLoader.java b/fe/src/main/java/org/apache/impala/hive/executor/HiveUdfLoader.java
new file mode 100644
index 0000000..15f84f5
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/hive/executor/HiveUdfLoader.java
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.hive.executor;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.ql.exec.FunctionUtils;
+import org.apache.hadoop.hive.ql.exec.FunctionUtils.UDFClassType;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.common.FileSystemUtil;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.UUID;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Class responsible for the Java reflection needed to fetch the UDF
+ * class and function.
+ */
+public class HiveUdfLoader {
+  private static final Logger LOG = Logger.getLogger(HiveUdfLoader.class);
+  private final Class<?> udfClass_;
+
+  private final UDFClassType classType_;
+
+  private final ClassLoader classLoader_;
+
+
+  private boolean isClassLoaderClosed_;
+
+  /**
+   * Creates a HiveUdfLoader object responsible for loading a jar and class
+   * for a UDF.
+   * @param localJarPath: the file system path where the jar file is located.
+   * @param className: the class name of the UDF
+   * @param persistLoader: true if loader needs to be persisted after loading. At
+   *     more classes, so we allow this flexibility. In this case, the caller is
+   *     responsible to call "close" on this object.
+   */
+  public HiveUdfLoader(String localJarPath, String className,
+      boolean persistLoader) throws CatalogException {
+    LOG.debug("Loading UDF '" + className + "' from " + localJarPath);
+    // If the localJarPath is not set, we use the System ClassLoader which
+    // does not need to be tracked for closing.
+    classLoader_ = getClassLoader(localJarPath);
+    udfClass_  = loadUDFClass(className, classLoader_);
+    classType_ = FunctionUtils.getUDFClassType(udfClass_);
+
+    if (!persistLoader) {
+      close();
+    }
+  }
+
+  public Class<?> getUDFClass() {
+    return udfClass_;
+  }
+
+  public void close() {
+    // We only need to close URLClassLoaders. If no jar was present at instantiation,
+    // it uses the SystemClassLoader (leaving this in for legacy purposes, but I'm not
+    // sure this is even possible).
+    if (!(classLoader_ instanceof URLClassLoader)) {
+      return;
+    }
+
+    if (isClassLoaderClosed_) {
+      return;
+    }
+
+    URLClassLoader urlClassLoader = (URLClassLoader) classLoader_;
+    try {
+      urlClassLoader.close();
+    } catch (IOException e) {
+      LOG.warn("Failed to close classloader: " + e.getMessage());
+    }
+    isClassLoaderClosed_ = true;
+  }
+
+  public UDFClassType getUDFClassType() {
+    return classType_;
+  }
+
+  private static Class<?> loadUDFClass(String className,
+      ClassLoader classLoader) throws CatalogException {
+    try {
+      return classLoader.loadClass(className);
+    } catch (ClassNotFoundException c) {
+      String errorMsg = className + " not found in Jar.";
+      throw new CatalogException(errorMsg, c);
+    } catch (LinkageError e) {
+      String errorMsg = "Error resolving dependencies.";
+      throw new CatalogException("Error resolving dependencies for class " + className
+          + ".", e);
+    } catch (Exception e) {
+      throw new CatalogException("Error loading class " + className +".", e);
+    }
+  }
+
+  private static ClassLoader getClassLoader(String jarPath) throws CatalogException {
+    try {
+      if (jarPath == null) {
+        return ClassLoader.getSystemClassLoader();
+      }
+      URL[] classLoaderUrls = new URL[] {new URL(jarPath)};
+      return new URLClassLoader(classLoaderUrls);
+    } catch (MalformedURLException e) {
+      throw new CatalogException("Unable to load jar " + jarPath, e);
+    }
+  }
+
+  public static HiveUdfLoader createWithLocalPath(String localLibPath, Function fn)
+      throws CatalogException {
+    Path localJarPath = null;
+    String fullFunctionName = fn.getDbName() + "." + fn.getFunctionName();
+    String uri = fn.getResourceUris().get(0).getUri();
+    try {
+      // TODO(todd): cache these jars based on the mtime and file ID of the
+      // remote JAR? Can we share a cache with the backend?
+      String localJarPathString = null;
+      if (uri != null) {
+        localJarPath = new Path("file://" + localLibPath,
+            UUID.randomUUID().toString() + ".jar");
+        Preconditions.checkNotNull(localJarPath);
+        try {
+          FileSystemUtil.copyToLocal(new Path(uri), localJarPath);
+        } catch (IOException e) {
+          String errorMsg = "Couldn't copy " + uri + " to local path: " +
+              localJarPath.toString();
+          LOG.error(errorMsg, e);
+          throw new CatalogException(errorMsg);
+        }
+        localJarPathString = localJarPath.toString();
+      }
+      return new HiveUdfLoader(localJarPathString, fn.getClassName(), false);
+    } catch (Exception e) {
+      String errorMsg = "Could not load class " + fn.getClassName() + " from "
+          + "jar " + uri + ": " + e.getMessage();
+      throw new CatalogException(errorMsg, e);
+    } finally {
+      if (localJarPath != null) {
+        FileSystemUtil.deleteIfExists(localJarPath);
+      }
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/hive/executor/JavaUdfDataType.java b/fe/src/main/java/org/apache/impala/hive/executor/JavaUdfDataType.java
new file mode 100644
index 0000000..66d000f
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/hive/executor/JavaUdfDataType.java
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.hive.executor;
+
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.thrift.TPrimitiveType;
+
+  // Data types that are supported as return or argument types in Java UDFs.
+  public enum JavaUdfDataType {
+    INVALID_TYPE("INVALID_TYPE", TPrimitiveType.INVALID_TYPE),
+    BOOLEAN("BOOLEAN", TPrimitiveType.BOOLEAN),
+    BOOLEAN_WRITABLE("BOOLEAN_WRITABLE", TPrimitiveType.BOOLEAN),
+    TINYINT("TINYINT", TPrimitiveType.TINYINT),
+    BYTE_WRITABLE("BYTE_WRITABLE", TPrimitiveType.TINYINT),
+    SMALLINT("SMALLINT", TPrimitiveType.SMALLINT),
+    SHORT_WRITABLE("SHORT_WRITABLE", TPrimitiveType.SMALLINT),
+    INT("INT", TPrimitiveType.INT),
+    INT_WRITABLE("INT_WRITABLE", TPrimitiveType.INT),
+    BIGINT("BIGINT", TPrimitiveType.BIGINT),
+    LONG_WRITABLE("LONG_WRITABLE", TPrimitiveType.BIGINT),
+    FLOAT("FLOAT", TPrimitiveType.FLOAT),
+    FLOAT_WRITABLE("FLOAT_WRITABLE", TPrimitiveType.FLOAT),
+    DOUBLE("DOUBLE", TPrimitiveType.DOUBLE),
+    DOUBLE_WRITABLE("DOUBLE", TPrimitiveType.DOUBLE),
+    STRING("STRING", TPrimitiveType.STRING),
+    TEXT("TEXT", TPrimitiveType.STRING),
+    BYTES_WRITABLE("BYTES_WRITABLE", TPrimitiveType.STRING),
+    BYTE_ARRAY("BYTE_ARRAY", TPrimitiveType.STRING);
+
+    private final String description_;
+    private final TPrimitiveType thriftType_;
+
+    private JavaUdfDataType(String description, TPrimitiveType thriftType) {
+      description_ = description;
+      thriftType_ = thriftType;
+    }
+
+    @Override
+    public String toString() { return description_; }
+
+    public String getDescription() { return description_; }
+
+    public TPrimitiveType getPrimitiveType() { return thriftType_; }
+
+    public static JavaUdfDataType[] getTypes(Type typeArray[]) {
+      JavaUdfDataType[] types = new JavaUdfDataType[typeArray.length];
+      for (int i = 0; i < typeArray.length; ++i) {
+        types[i] = getType(typeArray[i]);
+      }
+      return types;
+    }
+
+    public static JavaUdfDataType[] getTypes(Class<?>[] typeArray) {
+      JavaUdfDataType[] types = new JavaUdfDataType[typeArray.length];
+      for (int i = 0; i < typeArray.length; ++i) {
+        types[i] = getType(typeArray[i]);
+      }
+      return types;
+    }
+
+    public static JavaUdfDataType getType(Type t) {
+      switch (t.getPrimitiveType().toThrift()) {
+        case BOOLEAN:
+          return JavaUdfDataType.BOOLEAN_WRITABLE;
+        case TINYINT:
+          return JavaUdfDataType.BYTE_WRITABLE;
+        case SMALLINT:
+          return JavaUdfDataType.SHORT_WRITABLE;
+        case INT:
+          return JavaUdfDataType.INT_WRITABLE;
+        case BIGINT:
+          return JavaUdfDataType.LONG_WRITABLE;
+        case FLOAT:
+          return JavaUdfDataType.FLOAT_WRITABLE;
+        case DOUBLE:
+          return JavaUdfDataType.DOUBLE_WRITABLE;
+        case STRING:
+          return JavaUdfDataType.TEXT;
+        default:
+          return null;
+      }
+    }
+
+    public static JavaUdfDataType getType(Class<?> c) {
+      if (c == BooleanWritable.class) {
+        return JavaUdfDataType.BOOLEAN_WRITABLE;
+      } else if (c == boolean.class || c == Boolean.class) {
+        return JavaUdfDataType.BOOLEAN;
+      } else if (c == ByteWritable.class) {
+        return JavaUdfDataType.BYTE_WRITABLE;
+      } else if (c == byte.class || c == Byte.class) {
+        return JavaUdfDataType.TINYINT;
+      } else if (c == ShortWritable.class) {
+        return JavaUdfDataType.SHORT_WRITABLE;
+      } else if (c == short.class || c == Short.class) {
+        return JavaUdfDataType.SMALLINT;
+      } else if (c == IntWritable.class) {
+        return JavaUdfDataType.INT_WRITABLE;
+      } else if (c == int.class || c == Integer.class) {
+        return JavaUdfDataType.INT;
+      } else if (c == LongWritable.class) {
+        return JavaUdfDataType.LONG_WRITABLE;
+      } else if (c == long.class || c == Long.class) {
+        return JavaUdfDataType.BIGINT;
+      } else if (c == FloatWritable.class) {
+        return JavaUdfDataType.FLOAT_WRITABLE;
+      } else if (c == float.class || c == Float.class) {
+        return JavaUdfDataType.FLOAT;
+      } else if (c == DoubleWritable.class) {
+        return JavaUdfDataType.DOUBLE_WRITABLE;
+      } else if (c == double.class || c == Double.class) {
+        return JavaUdfDataType.DOUBLE;
+      } else if (c == byte[].class) {
+        return JavaUdfDataType.BYTE_ARRAY;
+      } else if (c == BytesWritable.class) {
+        return JavaUdfDataType.BYTES_WRITABLE;
+      } else if (c == Text.class) {
+        return JavaUdfDataType.TEXT;
+      } else if (c == String.class) {
+        return JavaUdfDataType.STRING;
+      }
+      return JavaUdfDataType.INVALID_TYPE;
+    }
+
+    public static boolean isSupported(Type t) {
+      if (TPrimitiveType.INVALID_TYPE == t.getPrimitiveType().toThrift()) {
+        return false;
+      }
+      for(JavaUdfDataType javaType: JavaUdfDataType.values()) {
+        if (javaType.getPrimitiveType() == t.getPrimitiveType().toThrift()) {
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+
diff --git a/fe/src/main/java/org/apache/impala/hive/executor/UdfExecutor.java b/fe/src/main/java/org/apache/impala/hive/executor/UdfExecutor.java
index 0c9560c..79f225e 100644
--- a/fe/src/main/java/org/apache/impala/hive/executor/UdfExecutor.java
+++ b/fe/src/main/java/org/apache/impala/hive/executor/UdfExecutor.java
@@ -18,181 +18,33 @@
 package org.apache.impala.hive.executor;
 
 import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
 
-import org.apache.hadoop.hive.ql.exec.UDF;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
+import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.JniUtil;
 import org.apache.impala.thrift.THiveUdfExecutorCtorParams;
-import org.apache.impala.thrift.TPrimitiveType;
-import org.apache.impala.util.UnsafeUtil;
-import org.apache.log4j.Logger;
 import org.apache.thrift.protocol.TBinaryProtocol;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import org.apache.log4j.Logger;
 
 // Wrapper object to run hive UDFs. This class works with UdfCallExpr in the
 // backend to marshall data back and forth between the execution engine and
-// the java UDF class.
+// the java udf classes.
 // See the comments in be/src/exprs/hive-udf-call.h for more details.
 // TODO: should we cache loaded jars and classes?
 @SuppressWarnings("restriction")
 public class UdfExecutor {
   private static final Logger LOG = Logger.getLogger(UdfExecutor.class);
-  // By convention, the function in the class must be called evaluate()
-  public static final String UDF_FUNCTION_NAME = "evaluate";
 
-  // Object to deserialize ctor params from BE.
   private final static TBinaryProtocol.Factory PROTOCOL_FACTORY =
     new TBinaryProtocol.Factory();
 
-  // TODO UDF is deprecated in Hive and newer implementation of built-in functions using
-  // GenericUDF interface, we should consider supporting GenericUDFs in the future
-  private UDF udf_;
-  // setup by init() and cleared by close()
-  private Method method_;
-  // setup by init() and cleared by close()
-  private URLClassLoader classLoader_;
-
-  // Return and argument types of the function inferred from the udf method signature.
-  // The JavaUdfDataType enum maps it to corresponding primitive type.
-  private JavaUdfDataType[] argTypes_;
-  private JavaUdfDataType retType_;
-
-  // Input buffer from the backend. This is valid for the duration of an evaluate() call.
-  // These buffers are allocated in the BE.
-  private final long inputBufferPtr_;
-  private final long inputNullsPtr_;
-
-  // This is the byte offset in inputBufferPtr to the start of the input argument.
-  // e.g. *inputBufferPtr_[inputBufferOffsets[i]] is the ith input argument.
-  private final int[] inputBufferOffsets_;
-
-  // Output buffer to return non-string values. This buffers are allocated in the BE.
-  private final long outputBufferPtr_;
-  private final long outputNullPtr_;
-
-  // For StringValue return types, outputBufferPtr_ is the location of the 16-byte
-  // StringValue object. StringValue.ptr is set to outBufferStringPtr_. This buffer
-  // grows as necessary to fit the return string.
-  // This is allocated from the FE.
-  private long outBufferStringPtr_;
-
-  // Size of outBufferStringPtr_.
-  private int outBufferCapacity_;
-
-  // Preconstructed input objects for the UDF. This minimizes object creation overhead
-  // as these objects are reused across calls to evaluate().
-  private Object[] inputObjects_;
-  private Object[] inputArgs_; // inputArgs_[i] is either inputObjects_[i] or null
-
-  // Data types that are supported as return or argument types in Java UDFs.
-  public enum JavaUdfDataType {
-    INVALID_TYPE("INVALID_TYPE", TPrimitiveType.INVALID_TYPE),
-    BOOLEAN("BOOLEAN", TPrimitiveType.BOOLEAN),
-    BOOLEAN_WRITABLE("BOOLEAN_WRITABLE", TPrimitiveType.BOOLEAN),
-    TINYINT("TINYINT", TPrimitiveType.TINYINT),
-    BYTE_WRITABLE("BYTE_WRITABLE", TPrimitiveType.TINYINT),
-    SMALLINT("SMALLINT", TPrimitiveType.SMALLINT),
-    SHORT_WRITABLE("SHORT_WRITABLE", TPrimitiveType.SMALLINT),
-    INT("INT", TPrimitiveType.INT),
-    INT_WRITABLE("INT_WRITABLE", TPrimitiveType.INT),
-    BIGINT("BIGINT", TPrimitiveType.BIGINT),
-    LONG_WRITABLE("LONG_WRITABLE", TPrimitiveType.BIGINT),
-    FLOAT("FLOAT", TPrimitiveType.FLOAT),
-    FLOAT_WRITABLE("FLOAT_WRITABLE", TPrimitiveType.FLOAT),
-    DOUBLE("DOUBLE", TPrimitiveType.DOUBLE),
-    DOUBLE_WRITABLE("DOUBLE", TPrimitiveType.DOUBLE),
-    STRING("STRING", TPrimitiveType.STRING),
-    TEXT("TEXT", TPrimitiveType.STRING),
-    BYTES_WRITABLE("BYTES_WRITABLE", TPrimitiveType.STRING),
-    BYTE_ARRAY("BYTE_ARRAY", TPrimitiveType.STRING);
-
-    private final String description_;
-    private final TPrimitiveType thriftType_;
-
-    private JavaUdfDataType(String description, TPrimitiveType thriftType) {
-      description_ = description;
-      thriftType_ = thriftType;
-    }
+  private final HiveUdfExecutor hiveUdfExecutor_;
 
-    @Override
-    public String toString() { return description_; }
-
-    public TPrimitiveType getPrimitiveType() { return thriftType_; }
-
-    public static JavaUdfDataType getType(Class<?> c) {
-      if (c == BooleanWritable.class) {
-        return JavaUdfDataType.BOOLEAN_WRITABLE;
-      } else if (c == boolean.class || c == Boolean.class) {
-        return JavaUdfDataType.BOOLEAN;
-      } else if (c == ByteWritable.class) {
-        return JavaUdfDataType.BYTE_WRITABLE;
-      } else if (c == byte.class || c == Byte.class) {
-        return JavaUdfDataType.TINYINT;
-      } else if (c == ShortWritable.class) {
-        return JavaUdfDataType.SHORT_WRITABLE;
-      } else if (c == short.class || c == Short.class) {
-        return JavaUdfDataType.SMALLINT;
-      } else if (c == IntWritable.class) {
-        return JavaUdfDataType.INT_WRITABLE;
-      } else if (c == int.class || c == Integer.class) {
-        return JavaUdfDataType.INT;
-      } else if (c == LongWritable.class) {
-        return JavaUdfDataType.LONG_WRITABLE;
-      } else if (c == long.class || c == Long.class) {
-        return JavaUdfDataType.BIGINT;
-      } else if (c == FloatWritable.class) {
-        return JavaUdfDataType.FLOAT_WRITABLE;
-      } else if (c == float.class || c == Float.class) {
-        return JavaUdfDataType.FLOAT;
-      } else if (c == DoubleWritable.class) {
-        return JavaUdfDataType.DOUBLE_WRITABLE;
-      } else if (c == double.class || c == Double.class) {
-        return JavaUdfDataType.DOUBLE;
-      } else if (c == byte[].class) {
-        return JavaUdfDataType.BYTE_ARRAY;
-      } else if (c == BytesWritable.class) {
-        return JavaUdfDataType.BYTES_WRITABLE;
-      } else if (c == Text.class) {
-        return JavaUdfDataType.TEXT;
-      } else if (c == String.class) {
-        return JavaUdfDataType.STRING;
-      }
-      return JavaUdfDataType.INVALID_TYPE;
-    }
-
-    public static boolean isSupported(Type t) {
-      for(JavaUdfDataType javaType: JavaUdfDataType.values()) {
-        if (javaType == JavaUdfDataType.INVALID_TYPE) continue;
-        if (javaType.getPrimitiveType() == t.getPrimitiveType().toThrift()) {
-          return true;
-        }
-      }
-      return false;
-    }
-  }
+  private HiveUdfLoader udfLoader_;
 
   /**
    * Create a UdfExecutor, using parameters from a serialized thrift object. Used by
@@ -201,26 +53,26 @@ public class UdfExecutor {
   public UdfExecutor(byte[] thriftParams) throws ImpalaException {
     THiveUdfExecutorCtorParams request = new THiveUdfExecutorCtorParams();
     JniUtil.deserializeThrift(PROTOCOL_FACTORY, request, thriftParams);
-
-    String className = request.fn.scalar_fn.symbol;
-    String jarFile = request.local_location;
-    Type retType = Type.fromThrift(request.fn.ret_type);
-    Type[] parameterTypes = new Type[request.fn.arg_types.size()];
-    for (int i = 0; i < request.fn.arg_types.size(); ++i) {
-      parameterTypes[i] = Type.fromThrift(request.fn.arg_types.get(i));
+    String location = null;
+    try {
+      if (request.local_location != null) {
+        location = new File(request.local_location).toURI().toString();
+      }
+    } catch (Exception e) {
+      String errorMsg = "Could not load jar file " + request.local_location;
+      throw new ImpalaRuntimeException(errorMsg, e);
     }
-    inputBufferPtr_ = request.input_buffer_ptr;
-    inputNullsPtr_ = request.input_nulls_ptr;
-    outputBufferPtr_ = request.output_buffer_ptr;
-    outputNullPtr_ = request.output_null_ptr;
-    outBufferStringPtr_ = 0;
-    outBufferCapacity_ = 0;
-    inputBufferOffsets_ = new int[request.input_byte_offsets.size()];
-    for (int i = 0; i < request.input_byte_offsets.size(); ++i) {
-      inputBufferOffsets_[i] = request.input_byte_offsets.get(i).intValue();
+    try {
+      checkValidRequest(request);
+      udfLoader_ = new HiveUdfLoader(location, request.fn.scalar_fn.symbol, true);
+      hiveUdfExecutor_ = createHiveUdfExecutor(request, udfLoader_);
+      LOG.debug("Loaded UDF '" + request.fn.scalar_fn.symbol + "' from "
+          + request.local_location);
+    } catch (Exception e) {
+      String errorMsg = "Could not load class " + request.fn.scalar_fn.symbol + " from "
+          + "jar " + location + ": " + e.getMessage();
+      throw new ImpalaRuntimeException(errorMsg, e);
     }
-
-    init(jarFile, className, retType, parameterTypes);
   }
 
   @Override
@@ -229,27 +81,9 @@ public class UdfExecutor {
     super.finalize();
   }
 
-  /**
-   * Releases any resources allocated off the native heap and close the class
-   * loader we may have created.
-   */
   public void close() {
-    UnsafeUtil.UNSAFE.freeMemory(outBufferStringPtr_);
-    outBufferStringPtr_ = 0;
-    outBufferCapacity_ = 0;
-
-    if (classLoader_ != null) {
-      try {
-        classLoader_.close();
-      } catch (IOException e) {
-        // Log and ignore.
-        LOG.debug("Error closing the URLClassloader.", e);
-      }
-    }
-    // We are now un-usable (because the class loader has been
-    // closed), so null out method_ and classLoader_.
-    method_ = null;
-    classLoader_ = null;
+    hiveUdfExecutor_.close();
+    udfLoader_.close();
   }
 
   /**
@@ -257,58 +91,7 @@ public class UdfExecutor {
    * been serialized to 'input'
    */
   public void evaluate() throws ImpalaRuntimeException {
-    try {
-      for (int i = 0; i < argTypes_.length; ++i) {
-        if (UnsafeUtil.UNSAFE.getByte(inputNullsPtr_ + i) == 0) {
-          switch (argTypes_[i]) {
-            case BOOLEAN_WRITABLE:
-            case BYTE_WRITABLE:
-            case SHORT_WRITABLE:
-            case INT_WRITABLE:
-            case LONG_WRITABLE:
-            case FLOAT_WRITABLE:
-            case DOUBLE_WRITABLE:
-            case BYTE_ARRAY:
-            case BYTES_WRITABLE:
-            case TEXT:
-              inputArgs_[i] = inputObjects_[i];
-              break;
-            case BOOLEAN:
-              inputArgs_[i] = ((ImpalaBooleanWritable)inputObjects_[i]).get();
-              break;
-            case TINYINT:
-              inputArgs_[i] = ((ImpalaTinyIntWritable)inputObjects_[i]).get();
-              break;
-            case SMALLINT:
-              inputArgs_[i] = ((ImpalaSmallIntWritable)inputObjects_[i]).get();
-              break;
-            case INT:
-              inputArgs_[i] = ((ImpalaIntWritable)inputObjects_[i]).get();
-              break;
-            case BIGINT:
-              inputArgs_[i] = ((ImpalaBigIntWritable)inputObjects_[i]).get();
-              break;
-            case FLOAT:
-              inputArgs_[i] = ((ImpalaFloatWritable)inputObjects_[i]).get();
-              break;
-            case DOUBLE:
-              inputArgs_[i] = ((ImpalaDoubleWritable)inputObjects_[i]).get();
-              break;
-            case STRING:
-              Preconditions.checkState(inputObjects_[i] instanceof ImpalaBytesWritable);
-              inputArgs_[i] =
-                  new String(((ImpalaBytesWritable)inputObjects_[i]).getBytes());
-              break;
-          }
-        } else {
-          inputArgs_[i] = null;
-        }
-      }
-      evaluate(inputArgs_);
-    } catch (Exception e) {
-      e.printStackTrace(System.err);
-      throw new ImpalaRuntimeException("UDF::evaluate() ran into a problem.", e);
-    }
+    hiveUdfExecutor_.evaluate();
   }
 
   /**
@@ -316,350 +99,42 @@ public class UdfExecutor {
    * for testing and not the version of evaluate() the backend uses.
    */
   public long evaluateForTesting(Object... args) throws ImpalaRuntimeException {
-    try {
-      Object[] inputArgs = new Object[args.length];
-      for (int i = 0; i < args.length; ++i) {
-        switch (argTypes_[i]) {
-          case BOOLEAN_WRITABLE:
-          case BYTE_WRITABLE:
-          case SHORT_WRITABLE:
-          case INT_WRITABLE:
-          case LONG_WRITABLE:
-          case FLOAT_WRITABLE:
-          case DOUBLE_WRITABLE:
-          case TEXT:
-          case BYTE_ARRAY:
-          case BYTES_WRITABLE:
-          case STRING:
-            inputArgs[i] = args[i];
-            break;
-          case BOOLEAN:
-            inputArgs[i] = ((ImpalaBooleanWritable)args[i]).get();
-            break;
-          case TINYINT:
-            inputArgs[i] = ((ImpalaTinyIntWritable)args[i]).get();
-            break;
-          case SMALLINT:
-            inputArgs[i] = ((ImpalaSmallIntWritable)args[i]).get();
-            break;
-          case INT:
-            inputArgs[i] = ((ImpalaIntWritable)args[i]).get();
-            break;
-          case BIGINT:
-            inputArgs[i] = ((ImpalaBigIntWritable)args[i]).get();
-            break;
-          case FLOAT:
-            inputArgs[i] = ((ImpalaFloatWritable)args[i]).get();
-            break;
-          case DOUBLE:
-            inputArgs[i] = ((ImpalaDoubleWritable)args[i]).get();
-            break;
-        }
-      }
-      return evaluate(inputArgs);
-    } catch (Exception e) {
-      e.printStackTrace(System.err);
-      throw new ImpalaRuntimeException("UDF::evaluate() ran into a problem.", e);
-    }
+    return hiveUdfExecutor_.evaluateForTesting(args);
+  }
+
+  public Method getMethod() {
+    return hiveUdfExecutor_.getMethod();
   }
 
   /**
-   * Evalutes the UDF with 'args' as the input to the UDF.
-   * Returns 0 if the udf returned NULL. (the result is a ptr so this is okay).
+   * Finds the correct HiveUdfExecutor to use.
    */
-  private long evaluate(Object... args) throws ImpalaRuntimeException {
+  private HiveUdfExecutor createHiveUdfExecutor(THiveUdfExecutorCtorParams request,
+      HiveUdfLoader udfLoader) throws ImpalaRuntimeException {
     try {
-      storeUdfResult(method_.invoke(udf_, args));
-      if (UnsafeUtil.UNSAFE.getByte(outputNullPtr_) == 1) return 0;
-      return outputBufferPtr_;
-    } catch (IllegalArgumentException e) {
-      throw new ImpalaRuntimeException("UDF failed to evaluate", e);
-    } catch (IllegalAccessException e) {
-      throw new ImpalaRuntimeException("UDF failed to evaluate", e);
-    } catch (InvocationTargetException e) {
-      throw new ImpalaRuntimeException("UDF failed to evaluate", e);
-    }
-  }
-
-  public Method getMethod() { return method_; }
-
-  // Sets the result object 'obj' into the outputBufferPtr_
-  private void storeUdfResult(Object obj) throws ImpalaRuntimeException {
-    if (obj == null) {
-      UnsafeUtil.UNSAFE.putByte(outputNullPtr_, (byte)1);
-      return;
-    }
-
-    UnsafeUtil.UNSAFE.putByte(outputNullPtr_, (byte)0);
-    switch (retType_) {
-      case BOOLEAN_WRITABLE: {
-        BooleanWritable val = (BooleanWritable)obj;
-        UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, val.get() ? (byte)1 : 0);
-        return;
-      }
-      case BOOLEAN: {
-        UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, (boolean)obj ? (byte)1 : 0);
-        return;
-      }
-      case BYTE_WRITABLE: {
-        ByteWritable val = (ByteWritable)obj;
-        UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, val.get());
-        return;
-      }
-      case TINYINT: {
-        UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, (byte)obj);
-        return;
-      }
-      case SHORT_WRITABLE: {
-        ShortWritable val = (ShortWritable)obj;
-        UnsafeUtil.UNSAFE.putShort(outputBufferPtr_, val.get());
-        return;
-      }
-      case SMALLINT: {
-        UnsafeUtil.UNSAFE.putShort(outputBufferPtr_, (short)obj);
-        return;
-      }
-      case INT_WRITABLE: {
-        IntWritable val = (IntWritable)obj;
-        UnsafeUtil.UNSAFE.putInt(outputBufferPtr_, val.get());
-        return;
-      }
-      case INT: {
-        UnsafeUtil.UNSAFE.putInt(outputBufferPtr_, (int)obj);
-        return;
-      }
-      case LONG_WRITABLE: {
-        LongWritable val = (LongWritable)obj;
-        UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, val.get());
-        return;
-      }
-      case BIGINT: {
-        UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, (long)obj);
-        return;
-      }
-      case FLOAT_WRITABLE: {
-        FloatWritable val = (FloatWritable)obj;
-        UnsafeUtil.UNSAFE.putFloat(outputBufferPtr_, val.get());
-        return;
-      }
-      case FLOAT: {
-        UnsafeUtil.UNSAFE.putFloat(outputBufferPtr_, (float)obj);
-        return;
-      }
-      case DOUBLE_WRITABLE: {
-        DoubleWritable val = (DoubleWritable)obj;
-        UnsafeUtil.UNSAFE.putDouble(outputBufferPtr_, val.get());
-        return;
-      }
-      case DOUBLE: {
-        UnsafeUtil.UNSAFE.putDouble(outputBufferPtr_, (double)obj);
-        return;
-      }
-      case TEXT: {
-        copyBytesToOutputBuffer(((Text)obj).copyBytes());
-        return;
-      }
-      case BYTE_ARRAY: {
-        copyBytesToOutputBuffer((byte[]) obj);
-        return;
-      }
-      case BYTES_WRITABLE: {
-        copyBytesToOutputBuffer(((BytesWritable)obj).copyBytes());
-        return;
-      }
-      case STRING: {
-        copyBytesToOutputBuffer(((String)obj).getBytes());
-        return;
-      }
-      default:
-        throw new ImpalaRuntimeException("Unsupported return type: " + retType_);
-    }
-  }
-
-  private void copyBytesToOutputBuffer(byte[] bytes) {
-    if (bytes.length > outBufferCapacity_) {
-      outBufferStringPtr_ =
-          UnsafeUtil.UNSAFE.reallocateMemory(outBufferStringPtr_, bytes.length);
-      outBufferCapacity_ = bytes.length;
-      UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, outBufferStringPtr_);
-    }
-    UnsafeUtil.Copy(outBufferStringPtr_, bytes, 0, bytes.length);
-    UnsafeUtil.UNSAFE.putInt(
-        outputBufferPtr_ + ImpalaStringWritable.STRING_VALUE_LEN_OFFSET,
-        bytes.length);
-  }
-
-  // Preallocate the input objects that will be passed to the underlying UDF.
-  // These objects are allocated once and reused across calls to evaluate()
-  private void allocateInputObjects() throws ImpalaRuntimeException {
-    inputObjects_ = new Writable[argTypes_.length];
-    inputArgs_ = new Object[argTypes_.length];
-
-    for (int i = 0; i < argTypes_.length; ++i) {
-      int offset = inputBufferOffsets_[i];
-      switch (argTypes_[i]) {
-        case BOOLEAN:
-        case BOOLEAN_WRITABLE:
-          inputObjects_[i] = new ImpalaBooleanWritable(inputBufferPtr_ + offset);
-          break;
-        case TINYINT:
-        case BYTE_WRITABLE:
-          inputObjects_[i] = new ImpalaTinyIntWritable(inputBufferPtr_ + offset);
-          break;
-        case SMALLINT:
-        case SHORT_WRITABLE:
-          inputObjects_[i] = new ImpalaSmallIntWritable(inputBufferPtr_ + offset);
-          break;
-        case INT:
-        case INT_WRITABLE:
-          inputObjects_[i] = new ImpalaIntWritable(inputBufferPtr_ + offset);
-          break;
-        case BIGINT:
-        case LONG_WRITABLE:
-          inputObjects_[i] = new ImpalaBigIntWritable(inputBufferPtr_ + offset);
-          break;
-        case FLOAT:
-        case FLOAT_WRITABLE:
-          inputObjects_[i] = new ImpalaFloatWritable(inputBufferPtr_ + offset);
-          break;
-        case DOUBLE:
-        case DOUBLE_WRITABLE:
-          inputObjects_[i] = new ImpalaDoubleWritable(inputBufferPtr_ + offset);
-          break;
-        case TEXT:
-          inputObjects_[i] = new ImpalaTextWritable(inputBufferPtr_ + offset);
-          break;
-        case BYTES_WRITABLE:
-          inputObjects_[i] = new ImpalaBytesWritable(inputBufferPtr_ + offset);
-          break;
-        case STRING:
-          // String can be mapped to any String-like Writable class.
-          inputObjects_[i] = new ImpalaBytesWritable(inputBufferPtr_ + offset);
-          break;
+      switch (udfLoader.getUDFClassType()) {
+        case UDF: {
+            HiveLegacyJavaFunction function =
+                new HiveLegacyJavaFunction(udfLoader.getUDFClass(),
+                    HiveUdfExecutor.getRetType(request),
+                    HiveUdfExecutor.getParameterTypes(request));
+            return new HiveUdfExecutorLegacy(request, function);
+          }
         default:
-          throw new ImpalaRuntimeException("Unsupported argument type: " + argTypes_[i]);
+          throw new ImpalaRuntimeException("The class " + request.fn.scalar_fn.symbol +
+              " does not derive " + "from a known supported Hive UDF class " +
+              "(UDF).");
       }
+    } catch (CatalogException e) {
+      throw new ImpalaRuntimeException(e.getMessage(), e);
     }
   }
 
-  private URLClassLoader getClassLoader(String jarPath) throws MalformedURLException {
-    URL url = new File(jarPath).toURI().toURL();
-    return URLClassLoader.newInstance(new URL[] { url }, getClass().getClassLoader());
-  }
-
-  /**
-   * Sets the return type of a Java UDF. Returns true if the return type is compatible
-   * with the return type from the function definition. Throws an ImpalaRuntimeException
-   * if the return type is not supported.
-   */
-  private boolean setReturnType(Type retType, Class<?> udfReturnType)
-      throws ImpalaRuntimeException {
+  private void checkValidRequest(THiveUdfExecutorCtorParams request
+      ) throws ImpalaRuntimeException {
+    Type retType = Type.fromThrift(request.fn.ret_type);
     if (!JavaUdfDataType.isSupported(retType)) {
       throw new ImpalaRuntimeException("Unsupported return type: " + retType.toSql());
     }
-    JavaUdfDataType javaType = JavaUdfDataType.getType(udfReturnType);
-    // Check if the evaluate method return type is compatible with the return type from
-    // the function definition. This happens when both of them map to the same primitive
-    // type.
-    if (retType.getPrimitiveType().toThrift() != javaType.getPrimitiveType()) {
-      return false;
-    }
-    retType_ = javaType;
-    return true;
-  }
-
-  /**
-   * Sets the argument types of a Java UDF. Returns true if the argument types specified
-   * in the UDF are compatible with the argument types of the evaluate() function loaded
-   * from the associated JAR file.
-   */
-  private boolean setArgTypes(Type[] parameterTypes, Class<?>[] udfArgTypes) {
-    Preconditions.checkNotNull(argTypes_);
-    for (int i = 0; i < udfArgTypes.length; ++i) {
-      argTypes_[i] = JavaUdfDataType.getType(udfArgTypes[i]);
-      if (argTypes_[i].getPrimitiveType()
-          != parameterTypes[i].getPrimitiveType().toThrift()) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Initializes the UdfExecutor validating the UDF has the proper signature.
-   * This uses reflection to look up the "evaluate" function in the UDF class.
-   */
-  private void init(String jarPath, String udfPath,
-      Type retType, Type... parameterTypes) throws
-      ImpalaRuntimeException {
-    ArrayList<String> signatures = Lists.newArrayList();
-    try {
-      LOG.debug("Loading UDF '" + udfPath + "' from " + jarPath);
-      ClassLoader loader;
-      if (jarPath != null) {
-        // Save for cleanup.
-        classLoader_ = getClassLoader(jarPath);
-        loader = classLoader_;
-      } else {
-        loader = ClassLoader.getSystemClassLoader();
-      }
-      Class<?> c = Class.forName(udfPath, true, loader);
-      Class<? extends UDF> udfClass = c.asSubclass(UDF.class);
-      Constructor<? extends UDF> ctor = udfClass.getConstructor();
-      udf_ = ctor.newInstance();
-      argTypes_ = new JavaUdfDataType[parameterTypes.length];
-      Method[] methods = udfClass.getMethods();
-      for (Method m: methods) {
-        // By convention, the udf must contain the function "evaluate"
-        if (!m.getName().equals(UDF_FUNCTION_NAME)) continue;
-        signatures.add(m.toGenericString());
-        Class<?>[] methodTypes = m.getParameterTypes();
-
-        // Try to match the arguments
-        if (methodTypes.length != parameterTypes.length) continue;
-        if (methodTypes.length == 0 && parameterTypes.length == 0) {
-          // Special case where the UDF doesn't take any input args
-          method_ = m;
-          if (!setReturnType(retType, m.getReturnType())) continue;
-          setArgTypes(parameterTypes, methodTypes);
-          LOG.debug("Loaded UDF '" + udfPath + "' from " + jarPath);
-          return;
-        }
-
-        method_ = m;
-        if (!setReturnType(retType, m.getReturnType())) continue;
-        if (!setArgTypes(parameterTypes, methodTypes)) continue;
-        allocateInputObjects();
-        LOG.debug("Loaded UDF '" + udfPath + "' from " + jarPath);
-        return;
-      }
-
-      StringBuilder sb = new StringBuilder();
-      sb.append("Unable to find evaluate function with the correct signature: ")
-        .append(udfPath + ".evaluate(")
-        .append(Joiner.on(", ").join(parameterTypes))
-        .append(")\n")
-        .append("UDF contains: \n    ")
-        .append(Joiner.on("\n    ").join(signatures));
-      throw new ImpalaRuntimeException(sb.toString());
-    } catch (MalformedURLException e) {
-      throw new ImpalaRuntimeException("Unable to load jar.", e);
-    } catch (SecurityException e) {
-      throw new ImpalaRuntimeException("Unable to load function.", e);
-    } catch (ClassNotFoundException e) {
-      throw new ImpalaRuntimeException("Unable to find class.", e);
-    } catch (NoSuchMethodException e) {
-      throw new ImpalaRuntimeException(
-          "Unable to find constructor with no arguments.", e);
-    } catch (IllegalArgumentException e) {
-      throw new ImpalaRuntimeException(
-          "Unable to call UDF constructor with no arguments.", e);
-    } catch (InstantiationException e) {
-      throw new ImpalaRuntimeException("Unable to call create UDF instance.", e);
-    } catch (IllegalAccessException e) {
-      throw new ImpalaRuntimeException("Unable to call create UDF instance.", e);
-    } catch (InvocationTargetException e) {
-      throw new ImpalaRuntimeException("Unable to call create UDF instance.", e);
-    }
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 945da4c..20d743a 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -148,6 +148,8 @@ import org.apache.impala.common.TransactionException;
 import org.apache.impala.common.TransactionKeepalive.HeartbeatContext;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.hive.common.MutableValidWriteIdList;
+import org.apache.impala.hive.executor.HiveJavaFunction;
+import org.apache.impala.hive.executor.HiveJavaFunctionFactory;
 import org.apache.impala.thrift.JniCatalogConstants;
 import org.apache.impala.thrift.TAlterDbParams;
 import org.apache.impala.thrift.TAlterDbSetOwnerParams;
@@ -228,7 +230,6 @@ import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.AcidUtils.TblTransaction;
 import org.apache.impala.util.CompressionUtil;
 import org.apache.impala.util.DebugUtils;
-import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.KuduUtil;
@@ -348,6 +349,7 @@ public class CatalogOpExecutor {
   private final CatalogServiceCatalog catalog_;
   private final AuthorizationConfig authzConfig_;
   private final AuthorizationManager authzManager_;
+  private final HiveJavaFunctionFactory hiveJavaFuncFactory_;
 
   // A singleton monitoring class that keeps track of the catalog usage metrics.
   private final CatalogOperationMetrics catalogOpMetric_ =
@@ -358,11 +360,13 @@ public class CatalogOpExecutor {
   private final ReentrantLock metastoreDdlLock_ = new ReentrantLock();
 
   public CatalogOpExecutor(CatalogServiceCatalog catalog, AuthorizationConfig authzConfig,
-      AuthorizationManager authzManager) throws ImpalaException {
+      AuthorizationManager authzManager,
+      HiveJavaFunctionFactory hiveJavaFuncFactory) throws ImpalaException {
     Preconditions.checkNotNull(authzManager);
     catalog_ = Preconditions.checkNotNull(catalog);
     authzConfig_ = Preconditions.checkNotNull(authzConfig);
     authzManager_ = Preconditions.checkNotNull(authzManager);
+    hiveJavaFuncFactory_ = Preconditions.checkNotNull(hiveJavaFuncFactory);
   }
 
   public CatalogServiceCatalog getCatalog() { return catalog_; }
@@ -2105,6 +2109,11 @@ public class CatalogOpExecutor {
     }
     boolean isPersistentJavaFn =
         (fn.getBinaryType() == TFunctionBinaryType.JAVA) && fn.isPersistent();
+    HiveJavaFunction hiveJavaFunction = (fn.getBinaryType() == TFunctionBinaryType.JAVA)
+        ? hiveJavaFuncFactory_.create(
+            BackendConfig.INSTANCE.getBackendCfg().local_library_path,
+            (ScalarFunction) fn)
+        : null;
     Db db = catalog_.getDb(fn.dbName());
     if (db == null) {
       throw new CatalogException("Database: " + fn.dbName() + " does not exist.");
@@ -2136,16 +2145,9 @@ public class CatalogOpExecutor {
         // For persistent Java functions we extract all supported function signatures from
         // the corresponding Jar and add each signature to the catalog.
         Preconditions.checkState(fn instanceof ScalarFunction);
-        org.apache.hadoop.hive.metastore.api.Function hiveFn =
-            ((ScalarFunction) fn).toHiveFunction();
-        List<Function> funcs = FunctionUtils.extractFunctions(fn.dbName(), hiveFn,
-            BackendConfig.INSTANCE.getBackendCfg().local_library_path);
-        if (funcs.isEmpty()) {
-          throw new CatalogException(
-              "No compatible function signatures found in class: " + hiveFn
-                  .getClassName());
-        }
-        if (addJavaFunctionToHms(fn.dbName(), hiveFn, params.if_not_exists)) {
+        List<ScalarFunction> funcs = hiveJavaFunction.extract();
+        if (addJavaFunctionToHms(fn.dbName(), hiveJavaFunction.getHiveFunction(),
+            params.if_not_exists)) {
           for (Function addedFn : funcs) {
             if (LOG.isTraceEnabled()) {
               LOG.trace(String.format("Adding function: %s.%s", addedFn.dbName(),
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index 179f0e1..87cbce5 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -51,6 +51,7 @@ import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.JniUtil;
+import org.apache.impala.hive.executor.HiveJavaFunctionFactoryImpl;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TDdlExecRequest;
@@ -149,7 +150,8 @@ public class JniCatalog {
         cfg.local_library_path, metaStoreClientPool);
     authzManager_ = authzFactory.newAuthorizationManager(catalog_);
     catalog_.setAuthzManager(authzManager_);
-    catalogOpExecutor_ = new CatalogOpExecutor(catalog_, authzConfig, authzManager_);
+    catalogOpExecutor_ = new CatalogOpExecutor(catalog_, authzConfig, authzManager_,
+        new HiveJavaFunctionFactoryImpl());
     MetastoreEventFactory eventFactory =
         new EventFactoryForSyncToLatestEvent(catalogOpExecutor_);
     catalog_.setEventFactoryForSyncToLatestEvent(eventFactory);
diff --git a/fe/src/main/java/org/apache/impala/util/FunctionUtils.java b/fe/src/main/java/org/apache/impala/util/FunctionUtils.java
index b7f5aea..2888368 100644
--- a/fe/src/main/java/org/apache/impala/util/FunctionUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/FunctionUtils.java
@@ -28,9 +28,6 @@ import java.util.Map;
 import java.util.UUID;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.api.FunctionType;
-import org.apache.hadoop.hive.metastore.api.ResourceType;
-import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.Function.CompareMode;
@@ -40,7 +37,7 @@ import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.JniUtil;
-import org.apache.impala.hive.executor.UdfExecutor;
+import org.apache.impala.hive.executor.HiveUdfExecutorLegacy;
 import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TFunctionCategory;
 import org.apache.log4j.Logger;
@@ -56,95 +53,6 @@ public abstract class FunctionUtils {
   public static final FunctionResolutionOrder FUNCTION_RESOLUTION_ORDER =
       new FunctionResolutionOrder();
 
-  /**
-   * Returns a list of Impala Functions, one per compatible "evaluate" method in the UDF
-   * class referred to by the given Java function. This method copies the UDF Jar
-   * referenced by "function" to a temporary file in localLibraryPath_ and loads it
-   * into the jvm. Then we scan all the methods in the class using reflection and extract
-   * those methods and create corresponding Impala functions. Currently Impala supports
-   * only "JAR" files for symbols and also a single Jar containing all the dependent
-   * classes rather than a set of Jar files.
-   */
-  public static List<Function> extractFunctions(String db,
-      org.apache.hadoop.hive.metastore.api.Function function,
-      String localLibPath)
-      throws ImpalaRuntimeException{
-    List<Function> result = Lists.newArrayList();
-    List<String> addedSignatures = Lists.newArrayList();
-    StringBuilder warnMessage = new StringBuilder();
-    if (!FunctionUtils.isFunctionCompatible(function, warnMessage)) {
-      LOG.warn("Skipping load of incompatible function: " +
-          function.getFunctionName() + ". " + warnMessage.toString());
-      return result;
-    }
-    String jarUri = function.getResourceUris().get(0).getUri();
-    Class<?> udfClass = null;
-    Path localJarPath = null;
-    try {
-      // TODO(todd): cache these jars based on the mtime and file ID of the
-      // remote JAR? Can we share a cache with the backend?
-      localJarPath = new Path("file://" + localLibPath,
-          UUID.randomUUID().toString() + ".jar");
-      try {
-        FileSystemUtil.copyToLocal(new Path(jarUri), localJarPath);
-      } catch (IOException e) {
-        String errorMsg = "Error loading Java function: " + db + "." +
-            function.getFunctionName() + ". Couldn't copy " + jarUri +
-            " to local path: " + localJarPath.toString();
-        LOG.error(errorMsg, e);
-        throw new ImpalaRuntimeException(errorMsg);
-      }
-      URL[] classLoaderUrls = new URL[] {new URL(localJarPath.toString())};
-      try (URLClassLoader urlClassLoader = new URLClassLoader(classLoaderUrls)) {
-        udfClass = urlClassLoader.loadClass(function.getClassName());
-        // Check if the class is of UDF type. Currently we don't support other functions
-        // TODO: Remove this once we support Java UDAF/UDTF
-        if (org.apache.hadoop.hive.ql.exec.FunctionUtils.getUDFClassType(udfClass) !=
-            org.apache.hadoop.hive.ql.exec.FunctionUtils.UDFClassType.UDF) {
-          LOG.warn("Ignoring load of incompatible Java function: " +
-              function.getFunctionName() + " as " +
-              org.apache.hadoop.hive.ql.exec.FunctionUtils.getUDFClassType(udfClass)
-              + " is not a supported type. Only UDFs are supported");
-          return result;
-            }
-        // Load each method in the UDF class and create the corresponding Impala Function
-        // object.
-        for (Method m: udfClass.getMethods()) {
-          if (!m.getName().equals(UdfExecutor.UDF_FUNCTION_NAME)) continue;
-          Function fn = ScalarFunction.fromHiveFunction(db,
-              function.getFunctionName(), function.getClassName(),
-              m.getParameterTypes(), m.getReturnType(), jarUri);
-          if (fn == null) {
-            LOG.warn("Ignoring incompatible method: " + m.toString() + " during load of "
-                + "Hive UDF:" + function.getFunctionName() + " from " + udfClass);
-            continue;
-          }
-          if (!addedSignatures.contains(fn.signatureString())) {
-            result.add(fn);
-            addedSignatures.add(fn.signatureString());
-          }
-        }
-      }
-    } catch (ClassNotFoundException c) {
-      String errorMsg = "Error loading Java function: " + db + "." +
-          function.getFunctionName() + ". Symbol class " + function.getClassName() +
-          " not found in Jar: " + jarUri;
-      LOG.error(errorMsg);
-      throw new ImpalaRuntimeException(errorMsg, c);
-    } catch (Exception e) {
-      LOG.error("Skipping function load: " + function.getFunctionName(), e);
-      throw new ImpalaRuntimeException("Error extracting functions", e);
-    } catch (LinkageError e) {
-      String errorMsg = "Error resolving dependencies for Java function: " + db + "." +
-          function.getFunctionName();
-      LOG.error(errorMsg);
-      throw new ImpalaRuntimeException(errorMsg, e);
-    } finally {
-      if (localJarPath != null) FileSystemUtil.deleteIfExists(localJarPath);
-    }
-    return result;
-  }
-
   public static List<Function> deserializeNativeFunctionsFromDbParams(
       Map<String, String> dbParams) {
     List<Function> results = Lists.newArrayList();
@@ -164,49 +72,6 @@ public abstract class FunctionUtils {
     return results;
   }
 
-  /**
-   * Checks if the Hive function 'fn' is Impala compatible. A function is Impala
-   * compatible iff
-   *
-   * 1. The function is JAVA based,
-   * 2. Has exactly one binary resource associated (We don't support loading
-   *    dependencies yet) and
-   * 3. The binary is of type JAR.
-   *
-   * Returns true if compatible and false otherwise. In case of incompatible
-   * functions 'incompatMsg' has the reason for the incompatibility.
-   * */
-   private static boolean isFunctionCompatible(
-       org.apache.hadoop.hive.metastore.api.Function fn, StringBuilder incompatMsg) {
-    boolean isCompatible = true;
-    if (fn.getFunctionType() != FunctionType.JAVA) {
-      isCompatible = false;
-      incompatMsg.append("Function type: " + fn.getFunctionType().name()
-          + " is not supported. Only " + FunctionType.JAVA.name() + " functions "
-          + "are supported.");
-    } else if (fn.getResourceUrisSize() == 0) {
-      isCompatible = false;
-      incompatMsg.append("No executable binary resource (like a JAR file) is " +
-          "associated with this function. To fix this, recreate the function by " +
-          "specifying a 'location' in the function create statement.");
-    } else if (fn.getResourceUrisSize() != 1) {
-      isCompatible = false;
-      List<String> resourceUris = Lists.newArrayList();
-      for (ResourceUri resource: fn.getResourceUris()) {
-        resourceUris.add(resource.getUri());
-      }
-      incompatMsg.append("Impala does not support multiple Jars for dependencies."
-          + "(" + Joiner.on(",").join(resourceUris) + ") ");
-    } else if (fn.getResourceUris().get(0).getResourceType() != ResourceType.JAR) {
-      isCompatible = false;
-      incompatMsg.append("Function binary type: " +
-        fn.getResourceUris().get(0).getResourceType().name()
-        + " is not supported. Only " + ResourceType.JAR.name()
-        + " type is supported.");
-    }
-    return isCompatible;
-  }
-
   public static Function resolveFunction(Iterable<Function> fns, Function desc,
       CompareMode mode) {
     Preconditions.checkNotNull(fns);
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 96c84fa..42f3f4c 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -103,6 +103,8 @@ import org.apache.impala.common.Pair;
 import org.apache.impala.common.TransactionException;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.BackendConfig;
+import org.apache.impala.hive.executor.HiveJavaFunctionFactory;
+import org.apache.impala.hive.executor.TestHiveJavaFunctionFactory;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.testutil.CatalogServiceTestCatalog;
@@ -1725,16 +1727,18 @@ public class MetastoreEventsProcessorTest {
 
     public FakeCatalogOpExecutorForTests(CatalogServiceCatalog catalog,
         AuthorizationConfig authzConfig,
-        AuthorizationManager authzManager)
+        AuthorizationManager authzManager,
+        HiveJavaFunctionFactory hiveJavaFuncFactory)
         throws ImpalaException {
-      super(catalog, authzConfig, authzManager);
+      super(catalog, authzConfig, authzManager, hiveJavaFuncFactory);
     }
 
     public static CatalogOpExecutor create() throws ImpalaException {
       return new FakeCatalogOpExecutorForTests(
           FakeCatalogServiceCatalogForFlagTests.create(),
           new NoopAuthorizationFactory().getAuthorizationConfig(),
-          new NoopAuthorizationManager());
+          new NoopAuthorizationManager(),
+          new TestHiveJavaFunctionFactory());
     }
   }
 
diff --git a/fe/src/test/java/org/apache/impala/hive/executor/HiveLegacyJavaFunctionTest.java b/fe/src/test/java/org/apache/impala/hive/executor/HiveLegacyJavaFunctionTest.java
new file mode 100644
index 0000000..4aaa88f
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/hive/executor/HiveLegacyJavaFunctionTest.java
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.hive.executor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.udf.UDFAscii;
+import org.apache.hadoop.hive.ql.udf.UDFLog;
+import org.apache.hadoop.hive.ql.udf.UDFPI;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.ScalarFunction;
+import org.apache.impala.catalog.ScalarType;
+import org.apache.impala.thrift.TFunctionBinaryType;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+@SuppressWarnings("restriction")
+public class HiveLegacyJavaFunctionTest {
+  private static final String HIVE_BUILTIN_JAR = System.getenv("HIVE_HOME") + "/" +
+      "lib/hive-exec-" + System.getenv("IMPALA_HIVE_VERSION") + ".jar";
+
+  private static final String DB = "Db";
+  private static final String FN = "Fn";
+  private static final String JARFILE = "JarFile";
+  private static final String CLASSPATH = "ClassPath";
+
+  public static class DummyUDF extends UDF {
+    public DummyUDF() {
+    }
+  }
+
+  private class ExpectedFunction {
+    public final String className;
+    public final ScalarType retType;
+    public final List<ScalarType> paramTypes;
+    public final String db;
+    public final String fnName;
+    public final String jarFile;
+    public final String udfClassPath;
+
+    public ExpectedFunction(Class<?> udfClass, ScalarType retType,
+        List<ScalarType> paramTypes) {
+      this.className = udfClass.getSimpleName();
+      this.retType = retType;
+      this.paramTypes = new ArrayList<>(paramTypes);
+      this.db = udfClass.getSimpleName() + DB;
+      this.fnName  = udfClass.getSimpleName() + FN;
+      this.jarFile = udfClass.getSimpleName() + JARFILE;
+      this.udfClassPath = udfClass.getSimpleName() + CLASSPATH;
+    }
+
+    public boolean matches(ScalarFunction fn) {
+      if (!this.db.toLowerCase().equals(fn.dbName())) {
+        return false;
+      }
+      if (!this.fnName.toLowerCase().equals(fn.functionName())) {
+        return false;
+      }
+      if (!this.jarFile.equals(fn.getLocation().toString())) {
+        return false;
+      }
+      if (!this.udfClassPath.equals(fn.getSymbolName())) {
+        return false;
+      }
+      if (!this.retType.equals(fn.getReturnType())) {
+        return false;
+      }
+      if (this.paramTypes.size() != fn.getArgs().length) {
+        return false;
+      }
+
+      for (int i = 0; i < paramTypes.size(); ++i) {
+        if (this.paramTypes.get(i) != fn.getArgs()[i]) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  private class ExpectedFunctions {
+    public final List<ExpectedFunction> expectedFunctions = new ArrayList<>();
+
+    public void add(ExpectedFunction f) {
+      expectedFunctions.add(f);
+    }
+
+    public int size() {
+      return expectedFunctions.size();
+    }
+
+    /**
+     * Check that the extracted signatures match the expected signatures.
+     */
+    public void checkExpectedFuncs(List<ScalarFunction> scalarFns) {
+      List<ScalarFunction> scalarFnsCopy = new ArrayList<>(scalarFns);
+      for (ExpectedFunction expectedFunction : expectedFunctions) {
+        boolean found = false;
+        for (ScalarFunction scalarFn : scalarFnsCopy) {
+          if (expectedFunction.matches(scalarFn)) {
+            found = true;
+            scalarFnsCopy.remove(scalarFn);
+            break;
+          }
+        }
+        if (!found) {
+          Assert.fail("Expected function not extracted: " + expectedFunction.retType +
+              " " + expectedFunction.className + "(" +
+              Joiner.on(",").join(expectedFunction.paramTypes) + ")");
+        }
+      }
+      if (!scalarFnsCopy.isEmpty()) {
+        Assert.fail("Extracted unexpected function " +
+            scalarFnsCopy.get(0).getFunctionName() + " with signature: " +
+            scalarFnsCopy.get(0).signatureString());
+        }
+      }
+    }
+
+  @Test
+  public void testExtractFailNoEvaluateMethods() {
+    ExpectedFunctions expectedFuncs = new ExpectedFunctions();
+    Class<?> udfClass = DummyUDF.class;
+    try {
+      testScalar(udfClass, expectedFuncs);
+    } catch (CatalogException e) {
+      Assert.assertEquals(e.getMessage(),
+          "No compatible function signatures found.");
+      return;
+    }
+    Assert.fail("Extraction should not have succeeded.");
+  }
+
+  @Test
+  public void testExtractFailNotAUDF() {
+    ExpectedFunctions expectedFuncs = new ExpectedFunctions();
+    Class<?> udfClass = HiveLegacyJavaFunctionTest.class;
+    try {
+      testScalar(udfClass, expectedFuncs);
+    } catch (CatalogException e) {
+      Assert.assertTrue(e.getMessage().contains("Unable to cast to UDF instance."));
+      return;
+    }
+    Assert.fail("Extraction should not have succeeded.");
+  }
+
+  @Test
+  public void testExtract() throws CatalogException {
+    ExpectedFunctions expectedFuncs;
+    Class<?> udfClass;
+
+    udfClass = UDFPI.class;
+    expectedFuncs = new ExpectedFunctions();
+    expectedFuncs.add(new ExpectedFunction(udfClass, ScalarType.DOUBLE,
+        Lists.newArrayList()));
+    testScalar(udfClass, expectedFuncs);
+
+    udfClass = UDFLog.class;
+    expectedFuncs = new ExpectedFunctions();
+    expectedFuncs.add(new ExpectedFunction(udfClass, ScalarType.DOUBLE,
+        Lists.newArrayList(ScalarType.DOUBLE)));
+    expectedFuncs.add(new ExpectedFunction(udfClass, ScalarType.DOUBLE,
+        Lists.newArrayList(ScalarType.DOUBLE, ScalarType.DOUBLE)));
+    testScalar(udfClass, expectedFuncs);
+
+    udfClass = UDFAscii.class;
+    expectedFuncs = new ExpectedFunctions();
+    expectedFuncs.add(new ExpectedFunction(udfClass, ScalarType.INT,
+        Lists.newArrayList(ScalarType.STRING)));
+    testScalar(udfClass, expectedFuncs);
+  }
+
+  private void testScalar(Class<?> classToTest,
+      ExpectedFunctions expectedFuncs) throws CatalogException {
+    ScalarFunction scalarFn = createScalarFn(classToTest);
+    Function hiveFn = HiveJavaFunction.toHiveFunction(scalarFn);
+    HiveJavaFunction javaFn =
+        new HiveLegacyJavaFunction(classToTest, hiveFn, null, null);
+    expectedFuncs.checkExpectedFuncs(javaFn.extract());
+  }
+
+  private ScalarFunction createScalarFn(Class<?> udfClass) {
+    String simpleName = udfClass.getSimpleName();
+    // The actual names don't matter so much. We just need to make sure that
+    // the attributions of the original scalar function match the attributes
+    // for all extracted functions.
+    return ScalarFunction.createForTesting(
+        simpleName + DB,
+        simpleName + FN,
+        null,
+        null,
+        simpleName + JARFILE,
+        simpleName + CLASSPATH,
+        null,
+        null,
+        TFunctionBinaryType.JAVA);
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/hive/executor/TestHiveJavaFunctionFactory.java b/fe/src/test/java/org/apache/impala/hive/executor/TestHiveJavaFunctionFactory.java
new file mode 100644
index 0000000..05e2f5a
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/hive/executor/TestHiveJavaFunctionFactory.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.hive.executor;
+
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.ScalarFunction;
+import org.apache.impala.catalog.Type;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestHiveJavaFunctionFactory implements HiveJavaFunctionFactory {
+  public static class TestHiveJavaFunction implements HiveJavaFunction {
+
+    @Override
+    public List<ScalarFunction> extract() throws CatalogException {
+      return new ArrayList<>();
+    }
+
+    public Function getHiveFunction() {
+      return null;
+    }
+  }
+
+  public HiveJavaFunction create(String localLibPath, Function hiveFn,
+      Type retType, Type[] paramTypes) throws CatalogException {
+    return new TestHiveJavaFunction();
+  }
+
+  public HiveJavaFunction create(String localLibPath,
+      ScalarFunction fn) throws CatalogException {
+    return new TestHiveJavaFunction();
+  }
+
+  public HiveJavaFunction create(String localLibPath, Function hiveFn)
+      throws CatalogException {
+    return new TestHiveJavaFunction();
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/hive/executor/UdfExecutorTest.java b/fe/src/test/java/org/apache/impala/hive/executor/UdfExecutorTest.java
index fe4a1a4..a0883e2 100644
--- a/fe/src/test/java/org/apache/impala/hive/executor/UdfExecutorTest.java
+++ b/fe/src/test/java/org/apache/impala/hive/executor/UdfExecutorTest.java
@@ -239,6 +239,9 @@ public class UdfExecutorTest {
     TFunction fn = scalar_fn.toThrift();
 
     long inputNullsPtr = allocate(argTypes.size());
+    for (int i = 0; i < argTypes.size(); ++i) {
+      UnsafeUtil.UNSAFE.putByte(inputNullsPtr + i, (byte)0);
+    }
     long inputBufferPtr = allocate(inputBufferSize);
     long outputNullPtr = allocate(1);
     long outputBufferPtr = allocate(retType.getSlotSize());
@@ -249,8 +252,8 @@ public class UdfExecutorTest {
     return new UdfExecutor(serializer.serialize(params));
   }
 
-  // Runs the hive udf contained in c. Validates that c.evaluate(args) == retValue.
-  // Arguments and return value cannot be NULL.
+  // Runs the hive udf contained in c. Validates that c.evaluate(args) == expectedValue,
+  // if the "validate" argument is true. Arguments and return value cannot be NULL.
   void TestUdfImpl(String jarFile, Class<?> c, Object expectedValue,
       Type expectedType, boolean validate, Object... args)
     throws ImpalaException, MalformedURLException, TException {
@@ -264,11 +267,8 @@ public class UdfExecutorTest {
         // object here.
         if (method.getParameterTypes()[i] == Text.class) {
           inputArgs[i] = createText((String)args[i]);
-        } else if (method.getParameterTypes()[i] == BytesWritable.class) {
-          inputArgs[i] = createBytes((String)args[i]);
         } else {
-          Preconditions.checkState(method.getParameterTypes()[i] == String.class);
-          inputArgs[i] = args[i];
+          inputArgs[i] = createBytes((String)args[i]);
         }
       } else {
         inputArgs[i] = args[i];
diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
index 573e551..b3f8e67 100644
--- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
@@ -32,6 +32,7 @@ import org.apache.impala.catalog.events.NoOpEventProcessor;
 import org.apache.impala.catalog.metastore.NoOpCatalogMetastoreServer;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.hive.executor.TestHiveJavaFunctionFactory;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TUniqueId;
@@ -81,7 +82,8 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
       cs.setCatalogMetastoreServer(NoOpCatalogMetastoreServer.INSTANCE);
       cs.setCatalogOpExecutor(new CatalogOpExecutor(cs,
           new NoopAuthorizationFactory().getAuthorizationConfig(),
-          new NoopAuthorizationFactory.NoopAuthorizationManager()));
+          new NoopAuthorizationFactory.NoopAuthorizationManager(),
+          new TestHiveJavaFunctionFactory()));
       cs.setEventFactoryForSyncToLatestEvent(new EventFactoryForSyncToLatestEvent(
           cs.getCatalogOpExecutor()));
       cs.reset();
@@ -110,7 +112,8 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
     cs.setMetastoreEventProcessor(NoOpEventProcessor.getInstance());
     cs.setCatalogOpExecutor(new CatalogOpExecutor(cs,
         new NoopAuthorizationFactory().getAuthorizationConfig(),
-        new NoopAuthorizationFactory.NoopAuthorizationManager()));
+        new NoopAuthorizationFactory.NoopAuthorizationManager(),
+        new TestHiveJavaFunctionFactory()));
     cs.setEventFactoryForSyncToLatestEvent(
         new EventFactoryForSyncToLatestEvent(cs.getCatalogOpExecutor()));
     cs.reset();
diff --git a/fe/src/test/java/org/apache/impala/testutil/PlannerTestCaseLoader.java b/fe/src/test/java/org/apache/impala/testutil/PlannerTestCaseLoader.java
index 204b963..3ed7d59 100644
--- a/fe/src/test/java/org/apache/impala/testutil/PlannerTestCaseLoader.java
+++ b/fe/src/test/java/org/apache/impala/testutil/PlannerTestCaseLoader.java
@@ -21,6 +21,7 @@ import org.apache.impala.authorization.NoopAuthorizationFactory;
 import org.apache.impala.authorization.NoopAuthorizationFactory.NoopAuthorizationManager;
 import org.apache.impala.catalog.Catalog;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.hive.executor.TestHiveJavaFunctionFactory;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.service.Frontend;
 import org.apache.impala.thrift.TCatalogUpdateResult;
@@ -60,7 +61,8 @@ public class PlannerTestCaseLoader implements AutoCloseable {
     frontend_ = new Frontend(new NoopAuthorizationFactory(), catalog_);
     catalogOpExecutor_ = new CatalogOpExecutor(catalog_.getSrcCatalog(),
         new NoopAuthorizationFactory().getAuthorizationConfig(),
-        new NoopAuthorizationManager());
+        new NoopAuthorizationManager(),
+        new TestHiveJavaFunctionFactory());
   }
 
   public Catalog getSrcCatalog() { return catalog_.getSrcCatalog(); }
diff --git a/testdata/workloads/functional-query/queries/QueryTest/load-java-udfs-fail.test b/testdata/workloads/functional-query/queries/QueryTest/load-java-udfs-fail.test
new file mode 100644
index 0000000..7dbeaf0
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/load-java-udfs-fail.test
@@ -0,0 +1,9 @@
+====
+---- QUERY
+create function hive_round(double) returns double
+location '$FILESYSTEM_PREFIX/test-warehouse/hive-exec.jar'
+symbol='org.apache.hadoop.hive.ql.udf.UDFRound';
+---- CATCH
+org.apache.hadoop.hive.ql.udf.UDFRound not found in Jar
+====
+
diff --git a/testdata/workloads/functional-query/queries/QueryTest/load-java-udfs.test b/testdata/workloads/functional-query/queries/QueryTest/load-java-udfs.test
index da90b3c..0d02b7b 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/load-java-udfs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/load-java-udfs.test
@@ -4,26 +4,10 @@ create function hive_pi() returns double
 location '$FILESYSTEM_PREFIX/test-warehouse/hive-exec.jar'
 symbol='org.apache.hadoop.hive.ql.udf.UDFPI';
 
-create function hive_round(double) returns double
-location '$FILESYSTEM_PREFIX/test-warehouse/hive-exec.jar'
-symbol='org.apache.hadoop.hive.ql.udf.UDFRound';
-
-create function hive_floor(double) returns bigint
-location '$FILESYSTEM_PREFIX/test-warehouse/hive-exec.jar'
-symbol='org.apache.hadoop.hive.ql.udf.UDFFloor';
-
-create function hive_mod(int, int) returns int
-location '$FILESYSTEM_PREFIX/test-warehouse/hive-exec.jar'
-symbol='org.apache.hadoop.hive.ql.udf.UDFPosMod';
-
 create function hive_bin(bigint) returns string
 location '$FILESYSTEM_PREFIX/test-warehouse/hive-exec.jar'
 symbol='org.apache.hadoop.hive.ql.udf.UDFBin';
 
-create function hive_lower(string) returns string
-location '$FILESYSTEM_PREFIX/test-warehouse/hive-exec.jar'
-symbol='org.apache.hadoop.hive.ql.udf.UDFLower';
-
 # Used to test persistent java functions
 create function identity_anytype
 location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 9829137..685dfc9 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -327,6 +327,7 @@ class TestUdfExecution(TestUdfBase):
 
   def test_java_udfs(self, vector, unique_database):
     self.run_test_case('QueryTest/load-java-udfs', vector, use_db=unique_database)
+    self.run_test_case('QueryTest/load-java-udfs-fail', vector, use_db=unique_database)
     self.run_test_case('QueryTest/java-udf', vector, use_db=unique_database)
 
   def test_udf_errors(self, vector, unique_database):
@@ -424,7 +425,7 @@ class TestUdfTargeted(TestUdfBase):
 
   def test_udf_invalid_symbol(self, vector, unique_database):
     """ IMPALA-1642: Impala crashes if the symbol for a Hive UDF doesn't exist
-        Crashing is non-deterministic so we run the UDF several times."""
+        Invalid symbols are checked at UDF creation time."""
     src_udf_path = os.path.join(
         os.environ['IMPALA_HOME'], 'testdata/udfs/impala-hive-udfs.jar')
     tgt_udf_path = get_fs_path(
@@ -435,15 +436,11 @@ class TestUdfTargeted(TestUdfBase):
         "create function `{0}`.fn_invalid_symbol(STRING) returns "
         "STRING LOCATION '{1}' SYMBOL='not.a.Symbol'".format(
             unique_database, tgt_udf_path))
-    query = "select `{0}`.fn_invalid_symbol('test')".format(unique_database)
 
     self.filesystem_client.copy_from_local(src_udf_path, tgt_udf_path)
     self.client.execute(drop_fn_stmt)
-    self.client.execute(create_fn_stmt)
-    for _ in xrange(5):
-      ex = self.execute_query_expect_failure(self.client, query)
-      assert "Unable to find class" in str(ex)
-    self.client.execute(drop_fn_stmt)
+    ex = self.execute_query_expect_failure(self.client, create_fn_stmt)
+    assert "ClassNotFoundException" in str(ex)
 
   def test_hidden_symbol(self, vector, unique_database):
     """Test that symbols in the test UDFs are hidden by default and that therefore