You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2022/12/08 17:53:17 UTC

[impala] 02/02: IMPALA-11549: Support Hive GenericUdfs that return primitive java types

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 86740a7d35790453dbf007612bf38e2e06986bf4
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Thu Dec 1 18:49:43 2022 +0100

    IMPALA-11549: Support Hive GenericUdfs that return primitive java types
    
    Before this patch only the Writable* types were accepted in GenericUdfs
    as return types, while some GenericUdfs in the wild return primitive java
    types (e.g. Integer instead of IntWritable). For legacy Hive UDFs these
    return types were already handled, so the only change needed was to
    map the ObjectInspector subclasses (e.g. JavaIntObjectInspector) to the
    correct JavaUdfDataType in Impala.
    
    Testing:
    - Added a subclass for TestGenericUdf (TestGenericUdfWithJavaReturnTypes)
      that returns primitive java types (probably inheriting in the opposite
      direction would be more logical, but the diff is smaller this way).
    - Changed EE tests to also use TestGenericUdfWithJavaReturnTypes.
    - Changed FE tests (UdfExecutorTest) to check both
      TestGenericUdfWithJavaReturnTypes and TestGenericUdf.
    - Also added a test with BINARY type to UdfExecutorTest as this was
      forgotten during the original BINARY patch.
    
    Change-Id: I30679045d6693ebd35718b6f1a22aaa4963c1e63
    Reviewed-on: http://gerrit.cloudera.org:8080/19304
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../hive/executor/HiveGenericJavaFunction.java     |  61 ++++---
 .../hive/executor/HiveUdfExecutorGeneric.java      |   2 +-
 .../impala/hive/executor/JavaUdfDataType.java      |  36 ++++
 .../impala/hive/executor/TestGenericUdf.java       | 196 ++++++++++++++-------
 .../TestGenericUdfWithJavaReturnTypes.java         | 125 +++++++++++++
 .../impala/hive/executor/UdfExecutorTest.java      |  78 ++++----
 .../java/org/apache/impala/TestGenericUdf.java     | 189 +++++++++++++-------
 .../impala/TestGenericUdfWithJavaReturnTypes.java  | 125 +++++++++++++
 .../queries/QueryTest/generic-java-udf.test        |  84 ++++++---
 .../queries/QueryTest/load-generic-java-udfs.test  |  52 ++++++
 10 files changed, 740 insertions(+), 208 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/hive/executor/HiveGenericJavaFunction.java b/fe/src/main/java/org/apache/impala/hive/executor/HiveGenericJavaFunction.java
index c16543cef..80a4bfdb5 100644
--- a/fe/src/main/java/org/apache/impala/hive/executor/HiveGenericJavaFunction.java
+++ b/fe/src/main/java/org/apache/impala/hive/executor/HiveGenericJavaFunction.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hive.ql.exec.FunctionUtils.UDFClassType;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.impala.analysis.FunctionName;
 import org.apache.impala.analysis.HdfsUri;
@@ -70,6 +72,7 @@ public class HiveGenericJavaFunction implements HiveJavaFunction {
   private final Function hiveFn_;
 
   private final Type retType_;
+  private final ObjectInspector returnOi_;
 
   private final Type[] parameterTypes_;
 
@@ -83,6 +86,7 @@ public class HiveGenericJavaFunction implements HiveJavaFunction {
       retType_ = retType;
       parameterTypes_ = parameterTypes;
       genericUDF_ = createGenericUDFInstance(udfClass);
+      returnOi_ = initializeWrapper();
       checkValidFunction();
     } catch (CatalogException e) {
       String errorMsg = "Error retrieving class " + udfClass + ": " + e.getMessage();
@@ -120,6 +124,10 @@ public class HiveGenericJavaFunction implements HiveJavaFunction {
     return retType_;
   }
 
+  public ObjectInspector getReturnObjectInspector() {
+    return returnOi_;
+  }
+
   public Type[] getParameterTypes() {
     return parameterTypes_;
   }
@@ -141,51 +149,62 @@ public class HiveGenericJavaFunction implements HiveJavaFunction {
   }
 
   private void checkValidFunction() throws CatalogException {
+    if (returnOi_ != getInspector(retType_, true)
+        && returnOi_ != getInspector(retType_, false)
+        && !returnOi_.getTypeName().equals("void")) {
+      throw new CatalogException("Function expected return type " +
+          returnOi_.getTypeName() + " but was created with " + retType_);
+    }
+  }
+
+  private ObjectInspector initializeWrapper() throws CatalogException {
+    ObjectInspector[] parameterOIs = getInspectors(parameterTypes_, true);
     try {
-      ObjectInspector[] parameterOIs = getInspectors(parameterTypes_);
-      // Call the initialize method which will give us the return type that
-      // the GenericUDF produces. Then we check if it matches what we expect.
-      ObjectInspector returnOI = genericUDF_.initialize(parameterOIs);
-      if (returnOI != getInspector(retType_) && !returnOI.getTypeName().equals("void")) {
-        throw new CatalogException("Function expected return type " +
-            returnOI.getTypeName() + " but was created with " + retType_);
-      }
+      return genericUDF_.initialize(parameterOIs);
     } catch (UDFArgumentException e) {
-      LOG.error(e.getMessage());
+      LOG.info("GenericUDF initialization failed: " + e.getMessage());
       throw new CatalogException("Function cannot be created with the following " +
           "parameters: (" + Joiner.on(",").join(parameterTypes_) + "). ");
     }
   }
 
-  private ObjectInspector[] getInspectors(Type[] typeArray)
+  private ObjectInspector[] getInspectors(Type[] typeArray, boolean useWritable)
       throws CatalogException {
     ObjectInspector[] OIArray = new ObjectInspector[typeArray.length];
     for (int i = 0; i < typeArray.length; ++i) {
-      OIArray[i] = getInspector(typeArray[i]);
+      OIArray[i] = getInspector(typeArray[i], useWritable);
     }
     return OIArray;
   }
 
-  private ObjectInspector getInspector(Type t) throws CatalogException {
+  private ObjectInspector getInspector(Type t, boolean useWritable)
+      throws CatalogException {
+    PrimitiveCategory cat = getPrimitiveCategory(t);
+    return useWritable
+        ? PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(cat)
+        : PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(cat);
+  }
+
+  private PrimitiveCategory getPrimitiveCategory(Type t) throws CatalogException {
     switch (t.getPrimitiveType().toThrift()) {
       case BOOLEAN:
-        return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector;
+        return PrimitiveCategory.BOOLEAN;
       case TINYINT:
-        return PrimitiveObjectInspectorFactory.writableByteObjectInspector;
+        return PrimitiveCategory.BYTE;
       case SMALLINT:
-        return PrimitiveObjectInspectorFactory.writableShortObjectInspector;
+        return PrimitiveCategory.SHORT;
       case INT:
-        return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
+        return PrimitiveCategory.INT;
       case BIGINT:
-        return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+        return PrimitiveCategory.LONG;
       case FLOAT:
-        return PrimitiveObjectInspectorFactory.writableFloatObjectInspector;
+        return PrimitiveCategory.FLOAT;
       case DOUBLE:
-        return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+        return PrimitiveCategory.DOUBLE;
       case STRING:
-        return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
+        return PrimitiveCategory.STRING;
       case BINARY:
-        return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+        return PrimitiveCategory.BINARY;
       default:
         throw new CatalogException("Unsupported type: " + t);
     }
diff --git a/fe/src/main/java/org/apache/impala/hive/executor/HiveUdfExecutorGeneric.java b/fe/src/main/java/org/apache/impala/hive/executor/HiveUdfExecutorGeneric.java
index de26ccd27..45d9c8f55 100644
--- a/fe/src/main/java/org/apache/impala/hive/executor/HiveUdfExecutorGeneric.java
+++ b/fe/src/main/java/org/apache/impala/hive/executor/HiveUdfExecutorGeneric.java
@@ -83,7 +83,7 @@ public class HiveUdfExecutorGeneric extends HiveUdfExecutor {
    */
   public HiveUdfExecutorGeneric(THiveUdfExecutorCtorParams request,
       HiveGenericJavaFunction hiveJavaFn) throws ImpalaRuntimeException {
-    super(request, JavaUdfDataType.getType(hiveJavaFn.getRetType()),
+    super(request, JavaUdfDataType.getType(hiveJavaFn.getReturnObjectInspector()),
         JavaUdfDataType.getTypes(hiveJavaFn.getParameterTypes()));
     genericUDF_ = hiveJavaFn.getGenericUDFInstance();
     deferredParameters_ = createDeferredObjects();
diff --git a/fe/src/main/java/org/apache/impala/hive/executor/JavaUdfDataType.java b/fe/src/main/java/org/apache/impala/hive/executor/JavaUdfDataType.java
index 43410f553..d0824cfbe 100644
--- a/fe/src/main/java/org/apache/impala/hive/executor/JavaUdfDataType.java
+++ b/fe/src/main/java/org/apache/impala/hive/executor/JavaUdfDataType.java
@@ -20,6 +20,10 @@ package org.apache.impala.hive.executor;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveWritableObjectInspector;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.FloatWritable;
@@ -30,6 +34,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.thrift.TPrimitiveType;
 
+import com.google.common.base.Preconditions;
+
   // Data types that are supported as return or argument types in Java UDFs.
   public enum JavaUdfDataType {
     INVALID_TYPE("INVALID_TYPE", TPrimitiveType.INVALID_TYPE),
@@ -108,6 +114,36 @@ import org.apache.impala.thrift.TPrimitiveType;
       }
     }
 
+    public static JavaUdfDataType getType(ObjectInspector oi) {
+      // Only primitive objects are supported currently.
+      Preconditions.checkState(oi instanceof PrimitiveObjectInspector);
+      PrimitiveObjectInspector primOi = (PrimitiveObjectInspector) oi;
+      PrimitiveCategory cat = primOi.getPrimitiveCategory();
+      boolean writable = primOi.preferWritable();
+      switch (cat) {
+        case BOOLEAN:
+          return writable ? JavaUdfDataType.BOOLEAN_WRITABLE : JavaUdfDataType.BOOLEAN;
+        case BYTE:
+          return writable ? JavaUdfDataType.BYTE_WRITABLE : JavaUdfDataType.TINYINT;
+        case SHORT:
+          return writable ? JavaUdfDataType.SHORT_WRITABLE : JavaUdfDataType.SMALLINT;
+        case INT:
+          return writable ? JavaUdfDataType.INT_WRITABLE : JavaUdfDataType.INT;
+        case LONG:
+          return writable ? JavaUdfDataType.LONG_WRITABLE : JavaUdfDataType.BIGINT;
+        case FLOAT:
+          return writable ? JavaUdfDataType.FLOAT_WRITABLE : JavaUdfDataType.FLOAT;
+        case DOUBLE:
+          return writable ? JavaUdfDataType.DOUBLE_WRITABLE : JavaUdfDataType.DOUBLE;
+        case STRING:
+          return writable ? JavaUdfDataType.TEXT : JavaUdfDataType.STRING;
+        case BINARY:
+          return writable ? JavaUdfDataType.BYTES_WRITABLE : JavaUdfDataType.BYTE_ARRAY;
+        default:
+          return null;
+      }
+    }
+
     public static JavaUdfDataType getType(Class<?> c) {
       if (c == BooleanWritable.class) {
         return JavaUdfDataType.BOOLEAN_WRITABLE;
diff --git a/fe/src/test/java/org/apache/impala/hive/executor/TestGenericUdf.java b/fe/src/test/java/org/apache/impala/hive/executor/TestGenericUdf.java
index 3097878e9..22ec3b09d 100644
--- a/fe/src/test/java/org/apache/impala/hive/executor/TestGenericUdf.java
+++ b/fe/src/test/java/org/apache/impala/hive/executor/TestGenericUdf.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.FloatWritable;
@@ -46,27 +47,29 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+
 /**
  * Simple Generic UDFs for testing.
  *
  * Udf that takes a variable number of arguments of the same type and applies
- * the "+" operator to them. The "+" is a concatenation for string types. For
- * boolean types, it applies the OR operation. If only one argument is provided,
- * it returns that argument.
+ * the "+" operator to them. The "+" is a concatenation for string and binary types.
+ * For boolean types, it applies the OR operation. If only one argument is provided,
+ * it returns that argument. If any argument is NULL, it returns NULL.
+ *
+ * For all argument types the return type is Writable class (e.g. IntWritable).
+ * Generic UDfs can also return Java primitive classes (e.g. Integer). A separate
+ * UDF class (TestGenericUdfWithJavaReturnTypes) is created with similar behavior
+ * as this but different return types.
  *
  * This class is duplicated in fe and java/test-hive-udfs. We need this class in a
  * separate project so we can test loading UDF jars that are not already on the
  * classpath, and we can't delete the FE's class because UdfExecutorTest depends
  * on it.
  *
- * The jar for this file can be built by running "mvn clean package" in
- * tests/test-hive-udfs. This is run in testdata/bin/create-load-data.sh, and
- * copied to HDFS in testdata/bin/copy-udfs-uda.sh.
- *
  */
 public class TestGenericUdf extends GenericUDF {
 
-  private List<PrimitiveCategory> inputTypes_ = new ArrayList<>();
+  private List<PrimitiveCategory> inputTypes_;
   private PrimitiveObjectInspector retTypeOI_;
   private PrimitiveCategory argAndRetType_;
 
@@ -94,6 +97,8 @@ public class TestGenericUdf extends GenericUDF {
       throw new UDFArgumentException("No arguments provided.");
     }
 
+    // Resetting here as initialize can be called more than once by Hive.
+    inputTypes_  = new ArrayList<>();
     for (ObjectInspector oi : arguments) {
       if (!(oi instanceof PrimitiveObjectInspector)) {
         throw new UDFArgumentException("Found an input that is not a primitive.");
@@ -102,8 +107,8 @@ public class TestGenericUdf extends GenericUDF {
       inputTypes_.add(poi.getPrimitiveCategory());
     }
 
-    // return type is always same as last argument
-    retTypeOI_ = (PrimitiveObjectInspector) arguments[0];
+    // return type is always same as first argument
+    retTypeOI_ = getReturnObjectInspector((PrimitiveObjectInspector) arguments[0]);
 
     argAndRetType_ = retTypeOI_.getPrimitiveCategory();
 
@@ -111,32 +116,40 @@ public class TestGenericUdf extends GenericUDF {
     return retTypeOI_;
   }
 
+  protected PrimitiveObjectInspector getReturnObjectInspector(
+        PrimitiveObjectInspector oi) {
+    // Simply returns the same object inspector. Subclasses can override this to return
+    // different types of object inspectors.
+    return oi;
+  }
+
   @Override
   public Object evaluate(DeferredObject[] arguments)
       throws HiveException {
     if (arguments.length != inputTypes_.size()) {
       throw new HiveException("Number of arguments passed in did not match number of " +
-          "arguments expected.");
+          "arguments expected. Expected: "
+              + inputTypes_.size() + " actual: " +  arguments.length);
     }
     switch (argAndRetType_) {
       case BOOLEAN:
-        return evaluateBoolean(arguments);
+        return evaluateBooleanWrapped(arguments);
       case BYTE:
-        return evaluateByte(arguments);
+        return evaluateByteWrapped(arguments);
       case SHORT:
-        return evaluateShort(arguments);
+        return evaluateShortWrapped(arguments);
       case INT:
-        return evaluateInt(arguments);
+        return evaluateIntWrapped(arguments);
       case LONG:
-        return evaluateLong(arguments);
+        return evaluateLongWrapped(arguments);
       case FLOAT:
-        return evaluateFloat(arguments);
+        return evaluateFloatWrapped(arguments);
       case DOUBLE:
-        return evaluateDouble(arguments);
+        return evaluateDoubleWrapped(arguments);
       case STRING:
-        return evaluateString(arguments);
+        return evaluateStringWrapped(arguments);
       case BINARY:
-        return evaluateBinary(arguments);
+        return evaluateBinaryWrapped(arguments);
       case DATE:
       case TIMESTAMP:
       default:
@@ -164,8 +177,7 @@ public class TestGenericUdf extends GenericUDF {
     }
   }
 
-  public BooleanWritable evaluateBoolean(DeferredObject[] inputs) throws HiveException {
-    List<BooleanWritable> booleanInputs = new ArrayList<>();
+  protected Boolean evaluateBoolean(DeferredObject[] inputs) throws HiveException {
     boolean finalBoolean = false;
     for (DeferredObject input : inputs) {
       if (input == null) {
@@ -177,13 +189,10 @@ public class TestGenericUdf extends GenericUDF {
       boolean currentBool = ((BooleanWritable) input.get()).get();
       finalBoolean |= currentBool;
     }
-    BooleanWritable resultBool = new BooleanWritable();
-    resultBool.set(finalBoolean);
-    return resultBool;
+    return finalBoolean;
   }
 
-  public ByteWritable evaluateByte(DeferredObject[] inputs) throws HiveException {
-    List<ByteWritable> byteInputs = new ArrayList<>();
+  protected Byte evaluateByte(DeferredObject[] inputs) throws HiveException {
     byte finalByte = 0;
     for (DeferredObject input : inputs) {
       if (input == null) {
@@ -195,13 +204,10 @@ public class TestGenericUdf extends GenericUDF {
       byte currentByte = ((ByteWritable) input.get()).get();
       finalByte += currentByte;
     }
-    ByteWritable resultByte = new ByteWritable();
-    resultByte.set(finalByte);
-    return resultByte;
+    return finalByte;
   }
 
-  public ShortWritable evaluateShort(DeferredObject[] inputs) throws HiveException {
-    List<ShortWritable> shortInputs = new ArrayList<>();
+  protected Short evaluateShort(DeferredObject[] inputs) throws HiveException {
     short finalShort = 0;
     for (DeferredObject input : inputs) {
       if (input == null) {
@@ -213,13 +219,10 @@ public class TestGenericUdf extends GenericUDF {
       short currentShort = ((ShortWritable) input.get()).get();
       finalShort += currentShort;
     }
-    ShortWritable resultShort = new ShortWritable();
-    resultShort.set(finalShort);
-    return resultShort;
+    return finalShort;
   }
 
-  public IntWritable evaluateInt(DeferredObject[] inputs) throws HiveException {
-    List<IntWritable> intInputs = new ArrayList<>();
+  protected Integer evaluateInt(DeferredObject[] inputs) throws HiveException {
     int finalInt = 0;
     for (DeferredObject input : inputs) {
       if (input == null) {
@@ -231,13 +234,10 @@ public class TestGenericUdf extends GenericUDF {
       int currentInt = ((IntWritable) input.get()).get();
       finalInt += currentInt;
     }
-    IntWritable resultInt = new IntWritable();
-    resultInt.set(finalInt);
-    return resultInt;
+    return finalInt;
   }
 
-  public LongWritable evaluateLong(DeferredObject[] inputs) throws HiveException {
-    List<LongWritable> longInputs = new ArrayList<>();
+  protected Long evaluateLong(DeferredObject[] inputs) throws HiveException {
     long finalLong = 0;
     for (DeferredObject input : inputs) {
       if (input == null) {
@@ -249,13 +249,10 @@ public class TestGenericUdf extends GenericUDF {
       long currentLong = ((LongWritable) input.get()).get();
       finalLong += currentLong;
     }
-    LongWritable resultLong = new LongWritable();
-    resultLong.set(finalLong);
-    return resultLong;
+    return finalLong;
   }
 
-  public FloatWritable evaluateFloat(DeferredObject[] inputs) throws HiveException {
-    List<FloatWritable> floatInputs = new ArrayList<>();
+  protected Float evaluateFloat(DeferredObject[] inputs) throws HiveException {
     float finalFloat = 0.0F;
     for (DeferredObject input : inputs) {
       if (input == null) {
@@ -267,13 +264,10 @@ public class TestGenericUdf extends GenericUDF {
       float currentFloat = ((FloatWritable) input.get()).get();
       finalFloat += currentFloat;
     }
-    FloatWritable resultFloat = new FloatWritable();
-    resultFloat.set(finalFloat);
-    return resultFloat;
+    return finalFloat;
   }
 
-  public DoubleWritable evaluateDouble(DeferredObject[] inputs) throws HiveException {
-    List<DoubleWritable> doubleInputs = new ArrayList<>();
+  protected Double evaluateDouble(DeferredObject[] inputs) throws HiveException {
     double finalDouble = 0.0;
     for (DeferredObject input : inputs) {
       if (input == null) {
@@ -285,13 +279,10 @@ public class TestGenericUdf extends GenericUDF {
       double currentDouble = ((DoubleWritable) input.get()).get();
       finalDouble  += currentDouble;
     }
-    DoubleWritable resultDouble = new DoubleWritable();
-    resultDouble.set(finalDouble);
-    return resultDouble;
+    return finalDouble;
   }
 
-  public Text evaluateString(DeferredObject[] inputs) throws HiveException {
-    List<String> stringInputs = new ArrayList<>();
+  protected String evaluateString(DeferredObject[] inputs) throws HiveException {
     String finalString = "";
     for (DeferredObject input : inputs) {
       if (input == null) {
@@ -303,13 +294,12 @@ public class TestGenericUdf extends GenericUDF {
       String currentString = ((Text) input.get()).toString();
       finalString += currentString;
     }
-    Text resultString = new Text();
-    resultString.set(finalString);
-    return resultString;
+    return finalString;
   }
 
-  public BytesWritable evaluateBinary(DeferredObject[] inputs) throws HiveException {
-    byte[] result = null;
+  protected byte[] evaluateBinary(DeferredObject[] inputs) throws HiveException {
+    int resultLength = 0;
+
     for (DeferredObject input : inputs) {
       if (input == null) {
         return null;
@@ -319,16 +309,88 @@ public class TestGenericUdf extends GenericUDF {
             "Expected BytesWritable but got " + input.get().getClass());
       }
       byte[] currentArray = ((BytesWritable) input.get()).getBytes();
-      // Unlike other functions, simply return last argument.
-      result = currentArray;
+      resultLength += currentArray.length;
     }
+    int pos = 0;
+    byte[] result = new byte[resultLength];
+    for (DeferredObject input : inputs) {
+      byte[] currentArray = ((BytesWritable) input.get()).getBytes();
+      System.arraycopy(
+          currentArray, 0, result, pos, currentArray.length);
+      pos += currentArray.length;
+    }
+    return result;
+  }
+
+  // The evaluate*Wrapped functions below get the result from evaluate*
+  // and wrap in a Writable* class.
+
+  protected Object evaluateBooleanWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    BooleanWritable resultBool = new BooleanWritable();
+    resultBool.set(evaluateBoolean(inputs));
+    return resultBool;
+  }
+
+  protected Object evaluateByteWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    ByteWritable resultByte = new ByteWritable();
+    resultByte.set(evaluateByte(inputs));
+    return resultByte;
+  }
+
+  protected Object evaluateShortWrapped(DeferredObject[] inputs)
+     throws HiveException {
+    ShortWritable resultShort = new ShortWritable();
+    resultShort.set(evaluateShort(inputs));
+    return resultShort;
+  }
+
+  protected Object evaluateIntWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    IntWritable resultInt = new IntWritable();
+    resultInt.set(evaluateInt(inputs));
+    return resultInt;
+  }
+
+  protected Object evaluateLongWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    LongWritable resultLong = new LongWritable();
+    resultLong.set(evaluateLong(inputs));
+    return resultLong;
+  }
+
+  protected Object evaluateFloatWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    FloatWritable resultFloat = new FloatWritable();
+    resultFloat.set(evaluateFloat(inputs));
+    return resultFloat;
+  }
+
+  protected Object evaluateDoubleWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    DoubleWritable resultDouble = new DoubleWritable();
+    resultDouble.set(evaluateDouble(inputs));
+    return resultDouble;
+  }
+
+  protected Object evaluateStringWrapped(DeferredObject[] inputs) throws HiveException {
+    Text resultString = new Text();
+    resultString.set(evaluateString(inputs));
+    return resultString;
+  }
+
+  protected Object evaluateBinaryWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    byte[] result = evaluateBinary(inputs);
+    if (result == null) return null;
     BytesWritable resultBinary = new BytesWritable();
-    if (result != null) resultBinary.set(result, 0, result.length);
+    resultBinary.set(result, 0, result.length);
     return resultBinary;
   }
 
-  private String getSignatureString(PrimitiveCategory argAndRetType_,
+  protected String getSignatureString(PrimitiveCategory argAndRetType_,
       List<PrimitiveCategory> inputTypes_) {
     return argAndRetType_ + "TestGenericUdf(" + Joiner.on(",").join(inputTypes_) + ")";
   }
-}
+}
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/impala/hive/executor/TestGenericUdfWithJavaReturnTypes.java b/fe/src/test/java/org/apache/impala/hive/executor/TestGenericUdfWithJavaReturnTypes.java
new file mode 100644
index 000000000..4c7f5cb12
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/hive/executor/TestGenericUdfWithJavaReturnTypes.java
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.hive.executor;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+
+import com.google.common.base.Joiner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Simple Generic UDFs for testing.
+ *
+ * This class overrides a few methods in  TestGenericUdf to return primitive
+ * Java types instead of Hive's writable classes. Otherwise this class behaves
+ * exactly the same way as TestGenericUdf. See TestGenericUdf for more information.
+ *
+ * Similarly to TestGenericUdf this class also has copy in java/test-hive-udfs.
+ *
+ */
+public class TestGenericUdfWithJavaReturnTypes extends TestGenericUdf {
+
+  public TestGenericUdfWithJavaReturnTypes() {
+  }
+
+  @Override
+  protected PrimitiveObjectInspector getReturnObjectInspector(
+        PrimitiveObjectInspector oi) {
+    PrimitiveTypeInfo typeInfo = oi.getTypeInfo();
+    return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(typeInfo);
+  }
+
+
+  @Override
+  public String getDisplayString(String[] children) {
+    return "TestGenericUdfWithJavaReturnTypes";
+  }
+
+  // The evaluate*Wrapped functions below simply return the results of
+  // evaluate*.
+
+  @Override
+  protected Object evaluateBooleanWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    return evaluateBoolean(inputs);
+  }
+
+  @Override
+  protected Object evaluateByteWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    return evaluateByte(inputs);
+  }
+
+  @Override
+  protected Object evaluateShortWrapped(DeferredObject[] inputs)
+     throws HiveException {
+    return evaluateShort(inputs);
+  }
+
+  @Override
+  protected Object evaluateIntWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    return evaluateInt(inputs);
+  }
+
+  @Override
+  protected Object evaluateLongWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    return evaluateLong(inputs);
+  }
+
+  @Override
+  protected Object evaluateFloatWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    return evaluateFloat(inputs);
+  }
+
+  @Override
+  protected Object evaluateDoubleWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    return evaluateDouble(inputs);
+  }
+
+  @Override
+  protected Object evaluateStringWrapped(DeferredObject[] inputs) throws HiveException {
+    return evaluateString(inputs);
+  }
+
+  @Override
+  protected Object evaluateBinaryWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    return evaluateBinary(inputs);
+  }
+
+  @Override
+  protected String getSignatureString(PrimitiveCategory argAndRetType_,
+      List<PrimitiveCategory> inputTypes_) {
+    return argAndRetType_ + "TestGenericUdfWithJavaReturnTypes(" +
+        Joiner.on(",").join(inputTypes_) + ")";
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/hive/executor/UdfExecutorTest.java b/fe/src/test/java/org/apache/impala/hive/executor/UdfExecutorTest.java
index ecde05c30..a2e617a8e 100644
--- a/fe/src/test/java/org/apache/impala/hive/executor/UdfExecutorTest.java
+++ b/fe/src/test/java/org/apache/impala/hive/executor/UdfExecutorTest.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.udf.UDFAcos;
 import org.apache.hadoop.hive.ql.udf.UDFAscii;
 import org.apache.hadoop.hive.ql.udf.UDFAsin;
 import org.apache.hadoop.hive.ql.udf.UDFAtan;
+import org.apache.hadoop.hive.ql.udf.UDFBase64;
 import org.apache.hadoop.hive.ql.udf.UDFBin;
 import org.apache.hadoop.hive.ql.udf.UDFConv;
 import org.apache.hadoop.hive.ql.udf.UDFCos;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hive.ql.udf.UDFSpace;
 import org.apache.hadoop.hive.ql.udf.UDFSqrt;
 import org.apache.hadoop.hive.ql.udf.UDFSubstr;
 import org.apache.hadoop.hive.ql.udf.UDFTan;
+import org.apache.hadoop.hive.ql.udf.UDFUnbase64;
 import org.apache.hadoop.hive.ql.udf.UDFUnhex;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBRound;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper;
@@ -189,9 +191,10 @@ public class UdfExecutorTest {
       return Type.FLOAT;
     } else if (w instanceof ImpalaDoubleWritable) {
       return Type.DOUBLE;
-    } else if (w instanceof ImpalaBytesWritable || w instanceof ImpalaTextWritable
-        || w instanceof String) {
+    } else if (w instanceof ImpalaTextWritable || w instanceof String) {
       return Type.STRING;
+    } else if (w instanceof ImpalaBytesWritable) {
+      return Type.BINARY;
     }
     Preconditions.checkArgument(false);
     return Type.INVALID;
@@ -202,6 +205,7 @@ public class UdfExecutorTest {
   void validateArgType(Object w) {
     if (w instanceof String ||
         w instanceof Text ||
+        w instanceof ImpalaBytesWritable ||
         w instanceof ImpalaIntWritable ||
         w instanceof ImpalaFloatWritable ||
         w instanceof ImpalaBigIntWritable ||
@@ -347,7 +351,8 @@ public class UdfExecutorTest {
             errMsgs.add("Actual double:   " + actual);
           }
           break;
-        case STRING: {
+        case STRING:
+        case BINARY: {
           byte[] expectedBytes = null;
           if (expectedValue instanceof ImpalaBytesWritable) {
             expectedBytes = ((ImpalaBytesWritable)expectedValue).getBytes();
@@ -435,13 +440,15 @@ public class UdfExecutorTest {
     TestHiveUdf(UDFE.class, createDouble(Math.E));
     TestHiveUdf(UDFSign.class, createDouble(1), createDouble(3));
 
-    TestHiveUdf(UDFBin.class, createBytes("1100100"), createBigInt(100));
+    TestHiveUdf(UDFBin.class, createText("1100100"), createBigInt(100));
 
-    TestHiveUdf(UDFHex.class, createBytes("1F4"), createBigInt(500));
-    TestHiveUdf(UDFHex.class, createBytes("3E8"), createBigInt(1000));
+    TestHiveUdf(UDFHex.class, createText("1F4"), createBigInt(500));
+    TestHiveUdf(UDFHex.class, createText("3E8"), createBigInt(1000));
 
     TestHiveUdf(UDFHex.class, createText("31303030"), "1000");
-    TestHiveUdf(UDFUnhex.class, createText("aAzZ"), "61417A5A");
+    TestHiveUdf(UDFUnhex.class, createBytes("aAzZ"), "61417A5A");
+    TestHiveUdf(UDFBase64.class, createText("YWJjZA=="), createBytes("abcd"));
+    TestHiveUdf(UDFUnbase64.class, createBytes("abcd"), "YWJjZA==");
     TestHiveUdf(UDFConv.class, createText("1111011"),
         "123", createInt(10), createInt(2));
     freeAllocations();
@@ -471,42 +478,47 @@ public class UdfExecutorTest {
     TestHiveUdf(GenericUDFUpper.class, createText("HELLO"), createText("Hello"));
   }
 
+
+  void TestGenericUdf(Writable expectedValue, Object... args)
+      throws MalformedURLException, ImpalaException, TException {
+    // Test with both TestGenericUdf and TestGenericUdfWithJavaReturnTypes to cover
+    // both Writable and primitive Java return types.
+    TestUdf(null, TestGenericUdf.class, expectedValue, args);
+    TestUdf(null, TestGenericUdfWithJavaReturnTypes.class, expectedValue, args);
+  }
+
   @Test
   // Test GenericUDF for all supported types
   public void BasicGenericTest()
       throws ImpalaException, MalformedURLException, TException {
-    TestUdf(null, TestGenericUdf.class, createBoolean(true), createBoolean(true));
-    TestUdf(null, TestGenericUdf.class, createTinyInt(1), createTinyInt(1));
-    TestUdf(null, TestGenericUdf.class, createSmallInt(1), createSmallInt(1));
-    TestUdf(null, TestGenericUdf.class, createInt(1), createInt(1));
-    TestUdf(null, TestGenericUdf.class, createBigInt(1), createBigInt(1));
-    TestUdf(null, TestGenericUdf.class, createFloat(1.1f), createFloat(1.1f));
-    TestUdf(null, TestGenericUdf.class, createDouble(1.1), createDouble(1.1));
-    TestUdf(null, TestGenericUdf.class, createText("ABCD"), createText("ABCD"));
-    TestUdf(null, TestGenericUdf.class, createDouble(3),
-        createDouble(1), createDouble(2));
-    TestUdf(null, TestGenericUdf.class, createText("ABCXYZ"), createText("ABC"),
-        createText("XYZ"));
-    TestUdf(null, TestGenericUdf.class, createInt(3), createInt(1), createInt(2));
-    TestUdf(null, TestGenericUdf.class, createFloat(1.1f + 1.2f),
-        createFloat(1.1f), createFloat(1.2f));
-    TestUdf(null, TestGenericUdf.class, createDouble(1.1 + 1.2 + 1.3),
+    TestGenericUdf(createBoolean(true), createBoolean(true));
+    TestGenericUdf(createTinyInt(1), createTinyInt(1));
+    TestGenericUdf(createSmallInt(1), createSmallInt(1));
+    TestGenericUdf(createInt(1), createInt(1));
+    TestGenericUdf(createBigInt(1), createBigInt(1));
+    TestGenericUdf(createFloat(1.1f), createFloat(1.1f));
+    TestGenericUdf(createDouble(1.1), createDouble(1.1));
+    TestGenericUdf(createText("ABCD"), createText("ABCD"));
+    TestGenericUdf(createBytes("ABCD"), createBytes("ABCD"));
+    TestGenericUdf(createDouble(3), createDouble(1), createDouble(2));
+    TestGenericUdf(createText("ABCXYZ"), createText("ABC"), createText("XYZ"));
+    TestGenericUdf(createInt(3), createInt(1), createInt(2));
+    TestGenericUdf(createFloat(1.1f + 1.2f), createFloat(1.1f), createFloat(1.2f));
+    TestGenericUdf(createDouble(1.1 + 1.2 + 1.3),
         createDouble(1.1), createDouble(1.2), createDouble(1.3));
-    TestUdf(null, TestGenericUdf.class, createSmallInt(1 + 2), createSmallInt(1),
-        createSmallInt(2));
-    TestUdf(null, TestGenericUdf.class, createBoolean(true),
-        createBoolean(true), createBoolean(false));
-    TestUdf(null, TestGenericUdf.class, createInt(5 + 6 + 7), createInt(5),
-        createInt(6), createInt(7));
-    TestUdf(null, TestGenericUdf.class, createBoolean(true),
+    TestGenericUdf(createSmallInt(1 + 2), createSmallInt(1), createSmallInt(2));
+    TestGenericUdf(createBoolean(true), createBoolean(true), createBoolean(false));
+    TestGenericUdf(createInt(5 + 6 + 7), createInt(5), createInt(6), createInt(7));
+    TestGenericUdf(createBoolean(true),
         createBoolean(false), createBoolean(false), createBoolean(true));
-    TestUdf(null, TestGenericUdf.class, createFloat(1.1f + 1.2f + 1.3f),
+    TestGenericUdf(createFloat(1.1f + 1.2f + 1.3f),
         createFloat(1.1f), createFloat(1.2f), createFloat(1.3f));
-    TestUdf(null, TestGenericUdf.class, createInt(5 + 6 + 7 + 8), createInt(5),
+    TestGenericUdf(createInt(5 + 6 + 7 + 8), createInt(5),
         createInt(6), createInt(7), createInt(8));
-    TestUdf(null, TestGenericUdf.class, createBoolean(true),
+    TestGenericUdf(createBoolean(true),
         createBoolean(true), createBoolean(true), createBoolean(true),
         createBoolean(true));
+    TestGenericUdf(createBytes("ABCDEFGH"), createBytes("ABCD"), createBytes("EFGH"));
     freeAllocations();
   }
 
diff --git a/java/test-hive-udfs/src/main/java/org/apache/impala/TestGenericUdf.java b/java/test-hive-udfs/src/main/java/org/apache/impala/TestGenericUdf.java
index 32ea1f66c..2b1e05269 100644
--- a/java/test-hive-udfs/src/main/java/org/apache/impala/TestGenericUdf.java
+++ b/java/test-hive-udfs/src/main/java/org/apache/impala/TestGenericUdf.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.FloatWritable;
@@ -50,9 +51,14 @@ import java.util.Set;
  * Simple Generic UDFs for testing.
  *
  * Udf that takes a variable number of arguments of the same type and applies
- * the "+" operator to them. The "+" is a concatenation for string types. For
- * boolean types, it applies the OR operation. If only one argument is provided,
- * it returns that argument.
+ * the "+" operator to them. The "+" is a concatenation for string and binary types.
+ * For boolean types, it applies the OR operation. If only one argument is provided,
+ * it returns that argument. If any argument is NULL, it returns NULL.
+ *
+ * For all argument types the return type is Writable class (e.g. IntWritable).
+ * Generic UDfs can also return Java primitive classes (e.g. Integer). A separate
+ * UDF class (TestGenericUdfWithJavaReturnTypes) is created with similar behavior
+ * as this but different return types.
  *
  * This class is a copy of the TestGenericUdf class in the FE. We need this class in a
  * separate project so we can test loading UDF jars that are not already on the
@@ -66,7 +72,7 @@ import java.util.Set;
  */
 public class TestGenericUdf extends GenericUDF {
 
-  private List<PrimitiveCategory> inputTypes_ = new ArrayList<>();
+  private List<PrimitiveCategory> inputTypes_;
   private PrimitiveObjectInspector retTypeOI_;
   private PrimitiveCategory argAndRetType_;
 
@@ -94,6 +100,8 @@ public class TestGenericUdf extends GenericUDF {
       throw new UDFArgumentException("No arguments provided.");
     }
 
+    // Resetting here as initialize can be called more than once by Hive.
+    inputTypes_  = new ArrayList<>();
     for (ObjectInspector oi : arguments) {
       if (!(oi instanceof PrimitiveObjectInspector)) {
         throw new UDFArgumentException("Found an input that is not a primitive.");
@@ -102,8 +110,8 @@ public class TestGenericUdf extends GenericUDF {
       inputTypes_.add(poi.getPrimitiveCategory());
     }
 
-    // return type is always same as last argument
-    retTypeOI_ = (PrimitiveObjectInspector) arguments[0];
+    // return type is always same as first argument
+    retTypeOI_ = getReturnObjectInspector((PrimitiveObjectInspector) arguments[0]);
 
     argAndRetType_ = retTypeOI_.getPrimitiveCategory();
 
@@ -111,32 +119,40 @@ public class TestGenericUdf extends GenericUDF {
     return retTypeOI_;
   }
 
+  protected PrimitiveObjectInspector getReturnObjectInspector(
+      PrimitiveObjectInspector oi) {
+    // Simply returns the same object inspector. Subclasses can override this to return
+    // different types of object inspectors.
+    return oi;
+  }
+
   @Override
   public Object evaluate(DeferredObject[] arguments)
       throws HiveException {
     if (arguments.length != inputTypes_.size()) {
       throw new HiveException("Number of arguments passed in did not match number of " +
-          "arguments expected.");
+          "arguments expected. Expected: "
+              + inputTypes_.size() + " actual: " +  arguments.length);
     }
     switch (argAndRetType_) {
       case BOOLEAN:
-        return evaluateBoolean(arguments);
+        return evaluateBooleanWrapped(arguments);
       case BYTE:
-        return evaluateByte(arguments);
+        return evaluateByteWrapped(arguments);
       case SHORT:
-        return evaluateShort(arguments);
+        return evaluateShortWrapped(arguments);
       case INT:
-        return evaluateInt(arguments);
+        return evaluateIntWrapped(arguments);
       case LONG:
-        return evaluateLong(arguments);
+        return evaluateLongWrapped(arguments);
       case FLOAT:
-        return evaluateFloat(arguments);
+        return evaluateFloatWrapped(arguments);
       case DOUBLE:
-        return evaluateDouble(arguments);
+        return evaluateDoubleWrapped(arguments);
       case STRING:
-        return evaluateString(arguments);
+        return evaluateStringWrapped(arguments);
       case BINARY:
-        return evaluateBinary(arguments);
+        return evaluateBinaryWrapped(arguments);
       case DATE:
       case TIMESTAMP:
       default:
@@ -164,8 +180,7 @@ public class TestGenericUdf extends GenericUDF {
     }
   }
 
-  public BooleanWritable evaluateBoolean(DeferredObject[] inputs) throws HiveException {
-    List<BooleanWritable> booleanInputs = new ArrayList<>();
+  protected Boolean evaluateBoolean(DeferredObject[] inputs) throws HiveException {
     boolean finalBoolean = false;
     for (DeferredObject input : inputs) {
       if (input == null) {
@@ -177,13 +192,10 @@ public class TestGenericUdf extends GenericUDF {
       boolean currentBool = ((BooleanWritable) input.get()).get();
       finalBoolean |= currentBool;
     }
-    BooleanWritable resultBool = new BooleanWritable();
-    resultBool.set(finalBoolean);
-    return resultBool;
+    return finalBoolean;
   }
 
-  public ByteWritable evaluateByte(DeferredObject[] inputs) throws HiveException {
-    List<ByteWritable> byteInputs = new ArrayList<>();
+  protected Byte evaluateByte(DeferredObject[] inputs) throws HiveException {
     byte finalByte = 0;
     for (DeferredObject input : inputs) {
       if (input == null) {
@@ -195,13 +207,10 @@ public class TestGenericUdf extends GenericUDF {
       byte currentByte = ((ByteWritable) input.get()).get();
       finalByte += currentByte;
     }
-    ByteWritable resultByte = new ByteWritable();
-    resultByte.set(finalByte);
-    return resultByte;
+    return finalByte;
   }
 
-  public ShortWritable evaluateShort(DeferredObject[] inputs) throws HiveException {
-    List<ShortWritable> shortInputs = new ArrayList<>();
+  protected Short evaluateShort(DeferredObject[] inputs) throws HiveException {
     short finalShort = 0;
     for (DeferredObject input : inputs) {
       if (input == null) {
@@ -213,13 +222,10 @@ public class TestGenericUdf extends GenericUDF {
       short currentShort = ((ShortWritable) input.get()).get();
       finalShort += currentShort;
     }
-    ShortWritable resultShort = new ShortWritable();
-    resultShort.set(finalShort);
-    return resultShort;
+    return finalShort;
   }
 
-  public IntWritable evaluateInt(DeferredObject[] inputs) throws HiveException {
-    List<IntWritable> intInputs = new ArrayList<>();
+  protected Integer evaluateInt(DeferredObject[] inputs) throws HiveException {
     int finalInt = 0;
     for (DeferredObject input : inputs) {
       if (input == null) {
@@ -231,13 +237,10 @@ public class TestGenericUdf extends GenericUDF {
       int currentInt = ((IntWritable) input.get()).get();
       finalInt += currentInt;
     }
-    IntWritable resultInt = new IntWritable();
-    resultInt.set(finalInt);
-    return resultInt;
+    return finalInt;
   }
 
-  public LongWritable evaluateLong(DeferredObject[] inputs) throws HiveException {
-    List<LongWritable> longInputs = new ArrayList<>();
+  protected Long evaluateLong(DeferredObject[] inputs) throws HiveException {
     long finalLong = 0;
     for (DeferredObject input : inputs) {
       if (input == null) {
@@ -249,13 +252,10 @@ public class TestGenericUdf extends GenericUDF {
       long currentLong = ((LongWritable) input.get()).get();
       finalLong += currentLong;
     }
-    LongWritable resultLong = new LongWritable();
-    resultLong.set(finalLong);
-    return resultLong;
+    return finalLong;
   }
 
-  public FloatWritable evaluateFloat(DeferredObject[] inputs) throws HiveException {
-    List<FloatWritable> floatInputs = new ArrayList<>();
+  protected Float evaluateFloat(DeferredObject[] inputs) throws HiveException {
     float finalFloat = 0.0F;
     for (DeferredObject input : inputs) {
       if (input == null) {
@@ -267,13 +267,10 @@ public class TestGenericUdf extends GenericUDF {
       float currentFloat = ((FloatWritable) input.get()).get();
       finalFloat += currentFloat;
     }
-    FloatWritable resultFloat = new FloatWritable();
-    resultFloat.set(finalFloat);
-    return resultFloat;
+    return finalFloat;
   }
 
-  public DoubleWritable evaluateDouble(DeferredObject[] inputs) throws HiveException {
-    List<DoubleWritable> doubleInputs = new ArrayList<>();
+  protected Double evaluateDouble(DeferredObject[] inputs) throws HiveException {
     double finalDouble = 0.0;
     for (DeferredObject input : inputs) {
       if (input == null) {
@@ -285,13 +282,10 @@ public class TestGenericUdf extends GenericUDF {
       double currentDouble = ((DoubleWritable) input.get()).get();
       finalDouble  += currentDouble;
     }
-    DoubleWritable resultDouble = new DoubleWritable();
-    resultDouble.set(finalDouble);
-    return resultDouble;
+    return finalDouble;
   }
 
-  public Text evaluateString(DeferredObject[] inputs) throws HiveException {
-    List<String> stringInputs = new ArrayList<>();
+  protected String evaluateString(DeferredObject[] inputs) throws HiveException {
     String finalString = "";
     for (DeferredObject input : inputs) {
       if (input == null) {
@@ -303,13 +297,12 @@ public class TestGenericUdf extends GenericUDF {
       String currentString = ((Text) input.get()).toString();
       finalString += currentString;
     }
-    Text resultString = new Text();
-    resultString.set(finalString);
-    return resultString;
+    return finalString;
   }
 
-  public BytesWritable evaluateBinary(DeferredObject[] inputs) throws HiveException {
-    byte[] result = null;
+  protected byte[] evaluateBinary(DeferredObject[] inputs) throws HiveException {
+    int resultLength = 0;
+
     for (DeferredObject input : inputs) {
       if (input == null) {
         return null;
@@ -319,15 +312,87 @@ public class TestGenericUdf extends GenericUDF {
             "Expected BytesWritable but got " + input.get().getClass());
       }
       byte[] currentArray = ((BytesWritable) input.get()).getBytes();
-      // Unlike other functions, simply return last argument.
-      result = currentArray;
+      resultLength += currentArray.length;
+    }
+    int pos = 0;
+    byte[] result = new byte[resultLength];
+    for (DeferredObject input : inputs) {
+      byte[] currentArray = ((BytesWritable) input.get()).getBytes();
+      System.arraycopy(
+          currentArray, 0, result, pos, currentArray.length);
+      pos += currentArray.length;
     }
+    return result;
+  }
+
+  // The evaluate*Wrapped functions below get the result from evaluate*
+  // and wrap in a Writable* class.
+
+  protected Object evaluateBooleanWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    BooleanWritable resultBool = new BooleanWritable();
+    resultBool.set(evaluateBoolean(inputs));
+    return resultBool;
+  }
+
+  protected Object evaluateByteWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    ByteWritable resultByte = new ByteWritable();
+    resultByte.set(evaluateByte(inputs));
+    return resultByte;
+  }
+
+  protected Object evaluateShortWrapped(DeferredObject[] inputs)
+     throws HiveException {
+    ShortWritable resultShort = new ShortWritable();
+    resultShort.set(evaluateShort(inputs));
+    return resultShort;
+  }
+
+  protected Object evaluateIntWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    IntWritable resultInt = new IntWritable();
+    resultInt.set(evaluateInt(inputs));
+    return resultInt;
+  }
+
+  protected Object evaluateLongWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    LongWritable resultLong = new LongWritable();
+    resultLong.set(evaluateLong(inputs));
+    return resultLong;
+  }
+
+  protected Object evaluateFloatWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    FloatWritable resultFloat = new FloatWritable();
+    resultFloat.set(evaluateFloat(inputs));
+    return resultFloat;
+  }
+
+  protected Object evaluateDoubleWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    DoubleWritable resultDouble = new DoubleWritable();
+    resultDouble.set(evaluateDouble(inputs));
+    return resultDouble;
+  }
+
+  protected Object evaluateStringWrapped(DeferredObject[] inputs) throws HiveException {
+    Text resultString = new Text();
+    resultString.set(evaluateString(inputs));
+    return resultString;
+  }
+
+  protected Object evaluateBinaryWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    byte[] result = evaluateBinary(inputs);
+    if (result == null) return null;
     BytesWritable resultBinary = new BytesWritable();
-    if (result != null) resultBinary.set(result, 0, result.length);
+    resultBinary.set(result, 0, result.length);
     return resultBinary;
   }
 
-  private String getSignatureString(PrimitiveCategory argAndRetType_,
+  protected String getSignatureString(PrimitiveCategory argAndRetType_,
       List<PrimitiveCategory> inputTypes_) {
     return argAndRetType_ + "TestGenericUdf(" + Joiner.on(",").join(inputTypes_) + ")";
   }
diff --git a/java/test-hive-udfs/src/main/java/org/apache/impala/TestGenericUdfWithJavaReturnTypes.java b/java/test-hive-udfs/src/main/java/org/apache/impala/TestGenericUdfWithJavaReturnTypes.java
new file mode 100644
index 000000000..0b8d94145
--- /dev/null
+++ b/java/test-hive-udfs/src/main/java/org/apache/impala/TestGenericUdfWithJavaReturnTypes.java
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+
+import com.google.common.base.Joiner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Simple Generic UDFs for testing.
+ *
+ * This class overrides a few methods in  TestGenericUdf to return primitive
+ * Java types instead of Hive's writable classes. Otherwise this class behaves
+ * exactly the same way as TestGenericUdf. See TestGenericUdf for more information.
+ *
+ * Similarly to TestGenericUdf this class also has copy in the FE.
+ *
+ */
+public class TestGenericUdfWithJavaReturnTypes extends TestGenericUdf {
+
+  public TestGenericUdfWithJavaReturnTypes() {
+  }
+
+  @Override
+  protected PrimitiveObjectInspector getReturnObjectInspector(
+        PrimitiveObjectInspector oi) {
+    PrimitiveTypeInfo typeInfo = oi.getTypeInfo();
+    return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(typeInfo);
+  }
+
+
+  @Override
+  public String getDisplayString(String[] children) {
+    return "TestGenericUdfWithJavaReturnTypes";
+  }
+
+  // The evaluate*Wrapped functions below simply return the results of
+  // evaluate*.
+
+  @Override
+  protected Object evaluateBooleanWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    return evaluateBoolean(inputs);
+  }
+
+  @Override
+  protected Object evaluateByteWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    return evaluateByte(inputs);
+  }
+
+  @Override
+  protected Object evaluateShortWrapped(DeferredObject[] inputs)
+     throws HiveException {
+    return evaluateShort(inputs);
+  }
+
+  @Override
+  protected Object evaluateIntWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    return evaluateInt(inputs);
+  }
+
+  @Override
+  protected Object evaluateLongWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    return evaluateLong(inputs);
+  }
+
+  @Override
+  protected Object evaluateFloatWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    return evaluateFloat(inputs);
+  }
+
+  @Override
+  protected Object evaluateDoubleWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    return evaluateDouble(inputs);
+  }
+
+  @Override
+  protected Object evaluateStringWrapped(DeferredObject[] inputs) throws HiveException {
+    return evaluateString(inputs);
+  }
+
+  @Override
+  protected Object evaluateBinaryWrapped(DeferredObject[] inputs)
+      throws HiveException {
+    return evaluateBinary(inputs);
+  }
+
+  @Override
+  protected String getSignatureString(PrimitiveCategory argAndRetType_,
+      List<PrimitiveCategory> inputTypes_) {
+    return argAndRetType_ + "TestGenericUdfWithJavaReturnTypes(" +
+        Joiner.on(",").join(inputTypes_) + ")";
+  }
+}
diff --git a/testdata/workloads/functional-query/queries/QueryTest/generic-java-udf.test b/testdata/workloads/functional-query/queries/QueryTest/generic-java-udf.test
index 4b0849746..3651cb23a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/generic-java-udf.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/generic-java-udf.test
@@ -22,53 +22,76 @@ STRING
 ====
 ---- QUERY
 #Test GenericUDF functions
-select generic_identity(true), generic_identity(cast(NULL as boolean));
+select generic_identity(true), generic_identity(cast(NULL as boolean)),
+    generic_identity_java_ret_type(true),
+    generic_identity_java_ret_type(cast(NULL as boolean));
 ---- TYPES
-boolean, boolean
+boolean, boolean, boolean, boolean
 ---- RESULTS
-true,NULL
+true,NULL,true,NULL
 ====
 ---- QUERY
-select generic_identity(cast(10 as tinyint)), generic_identity(cast(NULL as tinyint));
+select generic_identity(cast(10 as tinyint)), generic_identity(cast(NULL as tinyint)),
+    generic_identity_java_ret_type(cast(10 as tinyint)),
+    generic_identity_java_ret_type(cast(NULL as tinyint));
 ---- TYPES
-tinyint, tinyint
+tinyint, tinyint, tinyint, tinyint
 ---- RESULTS
-10,NULL
+10,NULL,10,NULL
 ====
 ---- QUERY
-select generic_identity(cast(10 as smallint)), generic_identity(cast(NULL as smallint));
+select generic_identity(cast(10 as smallint)), generic_identity(cast(NULL as smallint)),
+    generic_identity_java_ret_type(cast(10 as smallint)),
+    generic_identity_java_ret_type(cast(NULL as smallint));
 ---- TYPES
-smallint, smallint
+smallint, smallint, smallint, smallint
 ---- RESULTS
-10,NULL
+10,NULL,10,NULL
 ====
 ---- QUERY
-select generic_identity(cast(10 as int)), generic_identity(cast(NULL as int));
+select generic_identity(cast(10 as int)), generic_identity(cast(NULL as int)),
+    generic_identity_java_ret_type(cast(10 as int)),
+    generic_identity_java_ret_type(cast(NULL as int));
 ---- TYPES
-int, int
+int, int, int, int
 ---- RESULTS
-10,NULL
+10,NULL,10,NULL
 ====
 ---- QUERY
-select generic_identity(cast(10 as bigint)), generic_identity(cast(NULL as bigint));
+select generic_identity(cast(10 as bigint)), generic_identity(cast(NULL as bigint)),
+    generic_identity_java_ret_type(cast(10 as bigint)),
+    generic_identity_java_ret_type(cast(NULL as bigint));
 ---- TYPES
-bigint, bigint
+bigint, bigint, bigint, bigint
 ---- RESULTS
-10,NULL
+10,NULL,10,NULL
 ====
 ---- QUERY
-select generic_identity(cast(10.0 as float)), generic_identity(cast(NULL as float));
+select generic_identity(cast(10.0 as float)), generic_identity(cast(NULL as float)),
+    generic_identity_java_ret_type(cast(10.0 as float)),
+    generic_identity_java_ret_type(cast(NULL as float));
 ---- TYPES
-float, float
+float, float, float, float
 ---- RESULTS
-10,NULL
+10,NULL,10,NULL
 ====
 ---- QUERY
-select generic_identity(cast(10.0 as double)), generic_identity(cast(NULL as double));
+select generic_identity(cast(10.0 as double)), generic_identity(cast(NULL as double)),
+    generic_identity_java_ret_type(cast(10.0 as double)),
+    generic_identity_java_ret_type(cast(NULL as double));
 ---- TYPES
-double, double
+double, double, double, double
 ---- RESULTS
-10,NULL
+10,NULL,10,NULL
+====
+---- QUERY
+select generic_identity(cast("a" as binary)), generic_identity(cast(NULL as binary)),
+    generic_identity_java_ret_type(cast("a" as binary)),
+    generic_identity_java_ret_type(cast(NULL as binary))
+---- TYPES
+binary, binary, binary, binary
+---- RESULTS
+'a','NULL','a','NULL'
 ====
 ---- QUERY
 # IMPALA-1134. Tests that strings are copied correctly
@@ -81,11 +104,24 @@ int, int, int
 10,20,30
 ====
 ---- QUERY
-select generic_identity(cast("a" as binary)), generic_identity(cast(NULL as binary));
+# IMPALA-1134. Tests that binaries are copied correctly
+select length(generic_identity(cast("0123456789" as binary))),
+       length(generic_add(cast("0123456789" as binary), cast("0123456789" as binary))),
+       length(generic_add(cast("0123456789" as binary), cast("0123456789" as binary),
+           cast("0123456789" as binary)));
+---- TYPES
+int, int, int
+---- RESULTS
+10,20,30
+====
+---- QUERY
+# Test UDFs with java return types with multiple arguements.
+select generic_add_java_ret_type(1, 2),
+       generic_add_java_ret_type("a", "bc", "d");
 ---- TYPES
-binary, binary
+int, string
 ---- RESULTS
-'a','NULL'
+3,'abcd'
 ====
 ---- QUERY
 # IMPALA-1392: Hive UDFs that throw exceptions should return NULL
diff --git a/testdata/workloads/functional-query/queries/QueryTest/load-generic-java-udfs.test b/testdata/workloads/functional-query/queries/QueryTest/load-generic-java-udfs.test
index ab679bcd4..ca3415c9c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/load-generic-java-udfs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/load-generic-java-udfs.test
@@ -60,6 +60,14 @@ create function generic_add(string, string, string) returns string
 location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
 symbol='org.apache.impala.TestGenericUdf';
 
+create function generic_add(binary, binary) returns binary
+location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
+symbol='org.apache.impala.TestGenericUdf';
+
+create function generic_add(binary, binary, binary) returns binary
+location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
+symbol='org.apache.impala.TestGenericUdf';
+
 create function generic_add(smallint, smallint) returns smallint
 location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
 symbol='org.apache.impala.TestGenericUdf';
@@ -87,4 +95,48 @@ symbol='org.apache.impala.GenericReplaceStringUdf';
 create function generic_import_nearby_classes(string) returns string
 location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
 symbol='org.apache.impala.GenericImportsNearbyClassesUdf';
+
+create function generic_identity_java_ret_type(boolean) returns boolean
+location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
+symbol='org.apache.impala.TestGenericUdfWithJavaReturnTypes';
+
+create function generic_identity_java_ret_type(tinyint) returns tinyint
+location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
+symbol='org.apache.impala.TestGenericUdfWithJavaReturnTypes';
+
+create function generic_identity_java_ret_type(smallint) returns smallint
+location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
+symbol='org.apache.impala.TestGenericUdfWithJavaReturnTypes';
+
+create function generic_identity_java_ret_type(int) returns int
+location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
+symbol='org.apache.impala.TestGenericUdfWithJavaReturnTypes';
+
+create function generic_identity_java_ret_type(bigint) returns bigint
+location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
+symbol='org.apache.impala.TestGenericUdfWithJavaReturnTypes';
+
+create function generic_identity_java_ret_type(float) returns float
+location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
+symbol='org.apache.impala.TestGenericUdfWithJavaReturnTypes';
+
+create function generic_identity_java_ret_type(double) returns double
+location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
+symbol='org.apache.impala.TestGenericUdfWithJavaReturnTypes';
+
+create function generic_identity_java_ret_type(string) returns string
+location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
+symbol='org.apache.impala.TestGenericUdfWithJavaReturnTypes';
+
+create function generic_identity_java_ret_type(binary) returns binary
+location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
+symbol='org.apache.impala.TestGenericUdfWithJavaReturnTypes';
+
+create function generic_add_java_ret_type(int, int) returns int
+location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
+symbol='org.apache.impala.TestGenericUdfWithJavaReturnTypes';
+
+create function generic_add_java_ret_type(string, string, string) returns string
+location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
+symbol='org.apache.impala.TestGenericUdfWithJavaReturnTypes';
 ====