You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2013/04/19 13:19:31 UTC

svn commit: r1469783 - in /pig/trunk: ./ ivy/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/ src/org/apache/pig/newplan/logical/expression/ src/org/apache/pig/parser/ test/org/apache/pig/builtin/ test/org/apache/pig/test/

Author: jcoveney
Date: Fri Apr 19 11:19:30 2013
New Revision: 1469783

URL: http://svn.apache.org/r1469783
Log:
PIG-3198: Let users use any function from PigType -> PigType as if it were builtlin (jcoveney)

Added:
    pig/trunk/src/org/apache/pig/builtin/InvokerFunction.java
    pig/trunk/src/org/apache/pig/builtin/InvokerGenerator.java
    pig/trunk/test/org/apache/pig/builtin/TestInvokerGenerator.java
    pig/trunk/test/org/apache/pig/test/TestBuiltinInvoker.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/build.xml
    pig/trunk/ivy.xml
    pig/trunk/ivy/libraries.properties
    pig/trunk/src/org/apache/pig/data/DataType.java
    pig/trunk/src/org/apache/pig/impl/PigContext.java
    pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
    pig/trunk/src/org/apache/pig/parser/AstValidator.g
    pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
    pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
    pig/trunk/src/org/apache/pig/parser/QueryLexer.g
    pig/trunk/src/org/apache/pig/parser/QueryParser.g

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Apr 19 11:19:30 2013
@@ -28,6 +28,8 @@ PIG-3174:  Remove rpm and deb artifacts 
 
 IMPROVEMENTS
 
+PIG-3198: Let users use any function from PigType -> PigType as if it were builtlin (jcoveney)
+
 PIG-3268: Case statement support (cheolsoo)
 
 PIG-3269: In operator support (cheolsoo)

Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Fri Apr 19 11:19:30 2013
@@ -329,6 +329,7 @@
             <include name="guava-${guava.version}.jar"/>
             <include name="automaton-${automaton.version}.jar"/>
             <include name="jansi-${jansi.version}.jar"/>
+            <include name="asm*.jar"/>
         </patternset>
     </fileset>
 
@@ -346,6 +347,7 @@
             <include name="log4j*.jar"/>
             <include name="slf4j*.jar"/>
             <include name="jsp-api*.jar"/>
+            <include name="asm*.jar"/>
         </patternset>
     </fileset>
 

Modified: pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Fri Apr 19 11:19:30 2013
@@ -218,6 +218,7 @@
     <dependency org="dk.brics.automaton" name="automaton" rev="1.11-8" conf="compile->default"/>
 
     <dependency org="org.jruby" name="jruby-complete" rev="${jruby.version}" conf="compile->master"/>
+    <dependency org="asm" name="asm" rev="${asm.version}" conf="compile->default"/>
 
     <dependency org="org.apache.hbase" name="hbase" rev="${hbase.version}" conf="compile->master">
       <artifact name="hbase" type="jar"/>

Modified: pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Fri Apr 19 11:19:30 2013
@@ -85,3 +85,4 @@ aopalliance.version=1.0
 jsr311-api.version=1.1.1
 mockito.version=1.8.4
 jansi.version=1.9
+asm.version=3.3.1

Added: pig/trunk/src/org/apache/pig/builtin/InvokerFunction.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/InvokerFunction.java?rev=1469783&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/InvokerFunction.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/InvokerFunction.java Fri Apr 19 11:19:30 2013
@@ -0,0 +1,9 @@
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+
+import org.apache.pig.data.Tuple;
+
+public interface InvokerFunction {
+    public Object eval(Tuple input) throws IOException;
+}

Added: pig/trunk/src/org/apache/pig/builtin/InvokerGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/InvokerGenerator.java?rev=1469783&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/InvokerGenerator.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/InvokerGenerator.java Fri Apr 19 11:19:30 2013
@@ -0,0 +1,354 @@
+package org.apache.pig.builtin;
+
+import static org.objectweb.asm.Opcodes.ACC_PUBLIC;
+import static org.objectweb.asm.Opcodes.ACC_SUPER;
+import static org.objectweb.asm.Opcodes.ALOAD;
+import static org.objectweb.asm.Opcodes.ARETURN;
+import static org.objectweb.asm.Opcodes.ASTORE;
+import static org.objectweb.asm.Opcodes.CHECKCAST;
+import static org.objectweb.asm.Opcodes.ICONST_0;
+import static org.objectweb.asm.Opcodes.ICONST_1;
+import static org.objectweb.asm.Opcodes.ICONST_2;
+import static org.objectweb.asm.Opcodes.ICONST_3;
+import static org.objectweb.asm.Opcodes.ICONST_4;
+import static org.objectweb.asm.Opcodes.ICONST_5;
+import static org.objectweb.asm.Opcodes.INVOKEINTERFACE;
+import static org.objectweb.asm.Opcodes.INVOKESPECIAL;
+import static org.objectweb.asm.Opcodes.INVOKESTATIC;
+import static org.objectweb.asm.Opcodes.INVOKEVIRTUAL;
+import static org.objectweb.asm.Opcodes.RETURN;
+import static org.objectweb.asm.Opcodes.V1_6;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.objectweb.asm.ClassWriter;
+import org.objectweb.asm.MethodVisitor;
+import org.python.google.common.collect.Maps;
+
+import com.google.common.collect.Sets;
+
+//TODO need to add support for ANY Pig type!
+//TODO statically cache the generated code based on the input Strings
+public class InvokerGenerator extends EvalFunc<Object> {
+    private String className_;
+    private String methodName_;
+    private String[] argumentTypes_;
+
+    private boolean isInitialized = false;
+
+    private InvokerFunction generatedFunction;
+    private Schema outputSchema;
+
+    private static int uniqueId = 0;
+
+    private static final Map<Class<?>, Byte> returnTypeMap = new HashMap<Class<?>, Byte>() {{
+       put(String.class, DataType.CHARARRAY);
+       put(Integer.class, DataType.INTEGER);
+       put(Long.class, DataType.LONG);
+       put(Float.class, DataType.FLOAT);
+       put(Double.class, DataType.DOUBLE);
+       put(Boolean.class, DataType.BOOLEAN);
+       //put(byte[].class, DataType.BYTEARRAY);
+       put(Integer.TYPE, DataType.INTEGER);
+       put(Long.TYPE, DataType.LONG);
+       put(Float.TYPE, DataType.FLOAT);
+       put(Double.TYPE, DataType.DOUBLE);
+       put(Boolean.TYPE, DataType.BOOLEAN);
+
+    }};
+
+    private static final Map<Class<?>, Class<?>> inverseTypeMap = new HashMap<Class<?>, Class<?>>() {{
+       put(Integer.class, Integer.TYPE);
+       put(Long.class, Long.TYPE);
+       put(Float.class, Float.TYPE);
+       put(Double.class, Double.TYPE);
+       put(Boolean.class, Boolean.TYPE);
+       put(Integer.TYPE, Integer.class);
+       put(Long.TYPE, Long.class);
+       put(Float.TYPE, Float.class);
+       put(Double.TYPE, Double.class);
+       put(Boolean.TYPE, Boolean.class);
+    }};
+
+    private static final Map<Class<?>, String> primitiveSignature = new HashMap<Class<?>, String>() {{
+        put(Integer.TYPE, "I");
+        put(Long.TYPE, "J");
+        put(Float.TYPE, "F");
+        put(Double.TYPE, "D");
+        put(Boolean.TYPE, "Z");
+    }};
+
+    private static final Map<String,Class<?>> nameToClassObjectMap = new HashMap<String,Class<?>>() {{
+        put("String",String.class);
+        put("Integer", Integer.class);
+        put("int", Integer.TYPE);
+        put("Long", Long.class);
+        put("long", Long.TYPE);
+        put("Float", Float.class);
+        put("float", Float.TYPE);
+        put("Double", Double.class);
+        put("double", Double.TYPE);
+        put("Boolean", Boolean.class);
+        put("boolean", Boolean.TYPE);
+        //put("byte[]", byte[].class);
+        put("java.lang.String",String.class);
+        put("java.lang.Integer", Integer.class);
+        put("java.lang.Long", Long.class);
+        put("java.lang.Float", Float.class);
+        put("java.lang.Double", Double.class);
+        put("java.lang.Boolean", Boolean.class);
+    }};
+
+    public InvokerGenerator(String className, String methodName, String argumentTypes) {
+        className_ = className;
+        methodName_ = methodName;
+        argumentTypes_ = argumentTypes.split(",");
+        if ("".equals(argumentTypes)) {
+            argumentTypes_ = new String[0]; 
+        }
+    }
+
+    @Override
+    public Object exec(Tuple input) throws IOException {
+        if (!isInitialized)
+            initialize();
+
+        return generatedFunction.eval(input);
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        if (!isInitialized)
+            initialize();
+
+        return outputSchema;
+    }
+
+    private static int getUniqueId() {
+        return uniqueId++;
+    }
+
+    protected void initialize() {
+        Class<?> clazz;
+        try {
+            clazz = PigContext.resolveClassName(className_); //TODO I should probably be using this for all of the Class<?> resolution
+        } catch (IOException e) {
+            throw new RuntimeException("Given className not found: " + className_, e);
+        }
+
+        Class<?>[] arguments = getArgumentClassArray(argumentTypes_);
+
+        Method method;
+        try {
+            method = clazz.getMethod(methodName_, arguments); //must match exactly
+        } catch (SecurityException e) {
+            throw new RuntimeException("Not allowed to call given method["+methodName_+"] on class ["+className_+"] with arguments: " + Arrays.toString(argumentTypes_), e);
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException("Given method name ["+methodName_+"] does not exist on class ["+className_+"] with arguments: " + Arrays.toString(argumentTypes_), e);
+        }
+        boolean isStatic = Modifier.isStatic(method.getModifiers());
+
+        Class<?> returnClazz = method.getReturnType();
+        Byte type;
+        if (returnClazz.isPrimitive()) {
+            type = returnTypeMap.get(inverseTypeMap.get(returnClazz));
+        } else {
+            type = returnTypeMap.get(returnClazz);  
+        }
+
+        //TODO add functionality so that if the user pairs this witha  cast that it will let you return object
+        if (type == null) {
+            throw new RuntimeException("Function returns invalid type: " + returnClazz);
+        }
+
+        outputSchema = new Schema();
+        outputSchema.add(new Schema.FieldSchema(null, type));
+
+        generatedFunction = generateInvokerFunction("InvokerFunction_"+getUniqueId(), method, isStatic, arguments);
+
+        isInitialized = true;
+    }
+
+    private Class<?>[] getArgumentClassArray(String[] argumentTypes) {
+        Class<?>[] arguments = new Class<?>[argumentTypes.length];
+        for (int i = 0; i < argumentTypes.length; i++) {
+            try {
+                arguments[i]= nameToClassObjectMap.get(argumentTypes[i]);
+                if (arguments[i] == null) { 
+                    arguments[i] = PigContext.resolveClassName(argumentTypes[i]);                   
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("Unable to find class in PigContext: " + argumentTypes[i], e);
+            }
+        }
+        return arguments;
+    }
+
+    private InvokerFunction generateInvokerFunction(String className, Method method, boolean isStatic, Class<?>[] arguments) {
+        byte[] byteCode = generateInvokerFunctionBytecode(className, method, isStatic, arguments);
+
+        return ByteClassLoader.getInvokerFunction(className, byteCode);
+    }
+
+    private byte[] generateInvokerFunctionBytecode(String className, Method method, boolean isStatic, Class<?>[] arguments) {
+        boolean isInterface = method.getDeclaringClass().isInterface();
+        
+        ClassWriter cw = new ClassWriter(0);
+        cw.visit(V1_6, ACC_PUBLIC + ACC_SUPER, className, null, "java/lang/Object", new String[] { "org/apache/pig/builtin/InvokerFunction" });
+
+        makeConstructor(cw);
+
+        MethodVisitor mv = cw.visitMethod(ACC_PUBLIC, "eval", "(Lorg/apache/pig/data/Tuple;)Ljava/lang/Object;", null, new String[] { "java/io/IOException" });
+        mv.visitCode();
+
+        int next = 2;
+        //this will get the arguments from the Tuple, cast them, and astore them
+        int begin = 0;
+        if (!isStatic)
+            loadAndStoreArgument(mv, begin++, next++, getMethodStyleName(method.getDeclaringClass()));
+
+        for (int i = 0; i < arguments.length; i++)
+            loadAndStoreArgument(mv, i + begin, next++, getMethodStyleName(getObjectVersion(arguments[i])));
+
+        //puts the arguments on the stack
+        next = 2;
+
+        if (!isStatic) {
+            mv.visitVarInsn(ALOAD, next++); //put the method receiver on the stack
+        }
+
+        for (Class<?> arg : arguments) {
+            mv.visitVarInsn(ALOAD, next++);
+            unboxIfPrimitive(mv, arg);
+        }
+        String signature = buildSignatureString(arguments, method.getReturnType());
+        mv.visitMethodInsn(isStatic ? INVOKESTATIC : isInterface ? INVOKEINTERFACE : INVOKEVIRTUAL, getMethodStyleName(method.getDeclaringClass()), method.getName(), signature);
+        boxIfPrimitive(mv, method.getReturnType()); //TODO does this work?
+        mv.visitInsn(ARETURN);
+        mv.visitMaxs(2, (isStatic ? 2 : 3) + arguments.length);
+        mv.visitEnd();
+
+        cw.visitEnd();
+
+        return cw.toByteArray();
+    }
+
+    private String buildSignatureString(Class<?>[] arguments, Class<?> returnClazz) {
+        String sig = "(";
+        for (Class<?> arg : arguments) {
+            if (!arg.isPrimitive())
+                sig += "L" + getMethodStyleName(arg) + ";";
+            else
+                sig += getMethodStyleName(arg);
+        }
+        sig += ")";
+
+        if (!returnClazz.isPrimitive()) {
+            sig += "L" + getMethodStyleName(returnClazz) + ";";
+        } else {
+            sig += getMethodStyleName(returnClazz);
+        }
+        return sig;
+
+    }
+
+    private Class<?> getObjectVersion(Class<?> clazz) {
+        if (clazz.isPrimitive()) {
+            return inverseTypeMap.get(clazz);
+        }
+        return clazz;
+
+    }
+
+    private String getMethodStyleName(Class<?> clazz) {
+        if (!clazz.isPrimitive()) {
+            return clazz.getCanonicalName().replaceAll("\\.","/");
+        }
+        return primitiveSignature.get(clazz);
+    }
+
+    
+    private void boxIfPrimitive(MethodVisitor mv, Class<?> clazz) {
+        if (!clazz.isPrimitive()) {
+            return;
+        }
+        String boxedClass = getMethodStyleName(inverseTypeMap.get(clazz));
+        mv.visitMethodInsn(INVOKESTATIC, boxedClass, "valueOf", "("+getMethodStyleName(clazz)+")L"+boxedClass+";");
+    }
+    
+    private void unboxIfPrimitive(MethodVisitor mv, Class<?> clazz) {
+        if (!clazz.isPrimitive()) {
+            return;
+        }
+        String methodName = clazz.getSimpleName()+"Value";
+        mv.visitMethodInsn(INVOKEVIRTUAL, getMethodStyleName(inverseTypeMap.get(clazz)), methodName, "()"+getMethodStyleName(clazz));
+    }
+
+    private void loadAndStoreArgument(MethodVisitor mv, int loadIdx, int storeIdx, String castName) {
+        mv.visitVarInsn(ALOAD, 1); //loads the 1st argument
+        addConst(mv, loadIdx);
+        mv.visitMethodInsn(INVOKEINTERFACE, "org/apache/pig/data/Tuple", "get", "(I)Ljava/lang/Object;");
+        mv.visitTypeInsn(CHECKCAST, castName);
+        mv.visitVarInsn(ASTORE, storeIdx);
+    }
+
+    private void addConst(MethodVisitor mv, int idx) {
+        switch (idx) {
+            case(0): mv.visitInsn(ICONST_0); break;
+            case(1): mv.visitInsn(ICONST_1); break;
+            case(2): mv.visitInsn(ICONST_2); break;
+            case(3): mv.visitInsn(ICONST_3); break;
+            case(4): mv.visitInsn(ICONST_4); break;
+            case(5): mv.visitInsn(ICONST_5); break;
+            default:
+                throw new RuntimeException("Invalid index given to addConst: " + idx);
+        }
+    }
+
+    private void makeConstructor(ClassWriter cw) {
+        MethodVisitor mv = cw.visitMethod(ACC_PUBLIC, "<init>", "()V", null, null);
+        mv.visitCode();
+        mv.visitVarInsn(ALOAD, 0);
+        mv.visitMethodInsn(INVOKESPECIAL, "java/lang/Object", "<init>", "()V");
+        mv.visitInsn(RETURN);
+        mv.visitMaxs(1, 1);
+        mv.visitEnd();
+    }
+
+    static class ByteClassLoader extends ClassLoader {
+        private byte[] buf;
+
+        public ByteClassLoader(byte[] buf) {
+            this.buf = buf;
+        }
+
+        public Class<InvokerFunction> findClass(String name) {
+            return (Class<InvokerFunction>)defineClass(name, buf, 0, buf.length);
+        }
+
+        public static InvokerFunction getInvokerFunction(String name, byte[] buf) {
+            try {
+                return new ByteClassLoader(buf).findClass(name).newInstance();
+            } catch (InstantiationException e) {
+                throw new RuntimeException(e);
+            } catch (IllegalAccessException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}

Modified: pig/trunk/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DataType.java?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DataType.java (original)
+++ pig/trunk/src/org/apache/pig/data/DataType.java Fri Apr 19 11:19:30 2013
@@ -324,6 +324,30 @@ public class DataType {
         default: return "Unknown";
         }
     }
+    
+    public static Class<?> findTypeClass(byte dt) {
+        switch (dt) {
+        case NULL:      return Void.TYPE;
+        case BOOLEAN:   return Boolean.TYPE;
+        case BYTE:      return Byte.TYPE;
+        case INTEGER:   return Integer.TYPE;
+        case BIGINTEGER:                    return BigInteger.class;
+        case BIGDECIMAL:                    return BigDecimal.class;
+        case LONG:      return Long.TYPE;
+        case FLOAT:     return Float.TYPE;
+        case DOUBLE:    return Double.TYPE;
+        case DATETIME:  return DateTime.class;
+        case BYTEARRAY: return DataByteArray.class;
+        case BIGCHARARRAY: return String.class;
+        case CHARARRAY: return String.class;
+        case MAP:       return Map.class;
+        case INTERNALMAP: return InternalMap.class;
+        case TUPLE:     return Tuple.class;
+        case BAG:       return DataBag.class;
+        case GENERIC_WRITABLECOMPARABLE: return WritableComparable.class;
+        default: throw new RuntimeException("Invalid type has no corresponding class: " + dt);
+        }
+    }
 
     /**
      * Determine whether the this data type is complex.

Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Fri Apr 19 11:19:30 2013
@@ -739,6 +739,7 @@ public class PigContext implements Seria
         if (packageImportList.get() == null) {
             ArrayList<String> importlist = new ArrayList<String>();
             importlist.add("");
+            importlist.add("java.lang.");
             importlist.add("org.apache.pig.builtin.");
             importlist.add("org.apache.pig.impl.builtin.");
             packageImportList.set(importlist);

Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java Fri Apr 19 11:19:30 2013
@@ -19,12 +19,16 @@
 package org.apache.pig.newplan.logical.expression;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.builtin.InvokerGenerator;
 import org.apache.pig.builtin.Nondeterministic;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
@@ -38,7 +42,12 @@ import org.apache.pig.newplan.OperatorPl
 import org.apache.pig.newplan.PlanVisitor;
 import org.apache.pig.newplan.logical.Util;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.parser.LogicalPlanBuilder;
 import org.apache.pig.parser.SourceLocation;
+import org.python.google.common.base.Joiner;
+
+import com.google.common.collect.Lists;
 
 public class UserFuncExpression extends LogicalExpression {
 
@@ -75,6 +84,21 @@ public class UserFuncExpression extends 
         this( plan, funcSpec, args );
         this.viaDefine = viaDefine;
     }
+    
+    private boolean lazilyInitializeInvokerFunction = false;
+    private List<LogicalExpression> saveArgsForLater = null;
+    private boolean invokerIsStatic = false;
+    private String funcName = null;
+    private String packageName = null;
+    
+    public UserFuncExpression(OperatorPlan plan, FuncSpec funcSpec, List<LogicalExpression> args, boolean viaDefine, boolean lazilyInitializeInvokerFunction, boolean invokerIsStatic, String packageName, String funcName) {
+        this( plan, funcSpec, args, viaDefine );
+        this.saveArgsForLater = args;
+        this.lazilyInitializeInvokerFunction = lazilyInitializeInvokerFunction;
+        this.packageName = packageName;
+        this.funcName = funcName;
+        this.invokerIsStatic = invokerIsStatic;
+    }
 
     public FuncSpec getFuncSpec() {
         return mFuncSpec;
@@ -168,7 +192,7 @@ public class UserFuncExpression extends 
         mFuncSpec = funcSpec;
         ef = (EvalFunc<?>) PigContext.instantiateFuncFromSpec(mFuncSpec);
     }
-
+    
     @Override
     public LogicalSchema.LogicalFieldSchema getFieldSchema() throws FrontendException {
         if (fieldSchema!=null)
@@ -187,16 +211,21 @@ public class UserFuncExpression extends 
             }
         }
 
+        if (lazilyInitializeInvokerFunction) {
+            initializeInvokerFunction();
+        }
+        
         // Since ef only set one time, we never change its value, so we can optimize it by instantiate only once.
         // This significantly optimize the performance of frontend (PIG-1738)
-        if (ef==null)
+        if (ef==null) {
             ef = (EvalFunc<?>) PigContext.instantiateFuncFromSpec(mFuncSpec);
+        }
 
         ef.setUDFContextSignature(signature);
         Properties props = UDFContext.getUDFContext().getUDFProperties(ef.getClass());
         Schema translatedInputSchema = Util.translateSchema(inputSchema);
         if(translatedInputSchema != null) {
-    		props.put("pig.evalfunc.inputschema."+signature, translatedInputSchema);
+            props.put("pig.evalfunc.inputschema."+signature, translatedInputSchema);
         }
         // Store inputSchema into the UDF context
         ef.setInputSchema(translatedInputSchema);
@@ -228,6 +257,100 @@ public class UserFuncExpression extends 
         return fieldSchema;
     }
 
+    private void initializeInvokerFunction() {
+        List<LogicalFieldSchema> fieldSchemas = Lists.newArrayList();
+        for (LogicalExpression le : saveArgsForLater) {     
+            try {
+                fieldSchemas.add(le.getFieldSchema());
+            } catch (FrontendException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        
+        Class<?> funcClass;
+        
+        if (invokerIsStatic) {
+            try {
+                funcClass = PigContext.resolveClassName(packageName);
+            } catch (IOException e) {
+                throw new RuntimeException("Invoker function name not found: " + packageName, e);
+            }
+        } else {
+            funcClass = DataType.findTypeClass(fieldSchemas.get(0).type);
+            if (funcClass.isPrimitive()) {
+                funcClass = LogicalPlanBuilder.typeToClass(funcClass);
+            }
+        }
+        
+        Class<?>[] parameterTypes = new Class<?>[fieldSchemas.size() - (invokerIsStatic ? 0 : 1)];
+        int idx = 0;
+        for (int i = invokerIsStatic ? 0 : 1; i < fieldSchemas.size(); i++) {
+            parameterTypes[idx++] = DataType.findTypeClass(fieldSchemas.get(i).type);
+        }
+        
+        List<Integer> primitiveParameters = Lists.newArrayList();
+        
+        for (int i = 0; i < parameterTypes.length; i++) {
+            if (parameterTypes[i].isPrimitive()) {
+                primitiveParameters.add(i);
+            }
+        }
+        
+        int tries = 1 << primitiveParameters.size();
+
+        Method m = null;
+        
+        for (int i = 0; i < tries; i++) {
+            Class<?>[] tmpParameterTypes = new Class<?>[parameterTypes.length];
+            for (int j = 0; j < parameterTypes.length; j++) {
+                tmpParameterTypes[j] = parameterTypes[j];
+            }
+            
+            int tmp = i;
+            int idx2 = 0;
+            while (tmp > 0) {
+                if (tmp % 2 == 1) {
+                    int toFlip = primitiveParameters.get(idx2);
+                    tmpParameterTypes[toFlip] = LogicalPlanBuilder.typeToClass(tmpParameterTypes[toFlip]); 
+                }
+                tmp >>= 1;
+                idx2++;
+            }
+            
+            try {
+                m = funcClass.getMethod(funcName, parameterTypes);
+                if (m != null) {
+                    parameterTypes = tmpParameterTypes;
+                    break;
+                }
+            } catch (SecurityException e) {
+                throw new RuntimeException("Not allowed to access method ["+funcName+"] on class: " + funcClass, e);
+            } catch (NoSuchMethodException e) {
+                // we just continue, as we are searching for a match post-boxing
+            }
+        }
+        
+        if (m == null) {
+            throw new RuntimeException("Given method ["+funcName+"] does not exist on class: " + funcClass);
+        }
+        
+        String[] ctorArgs = new String[3];
+        ctorArgs[0] = funcClass.getName();
+        ctorArgs[1] = funcName;
+        ctorArgs[2] = "";
+        List<String> params = Lists.newArrayList();
+        for (Class<?> param : parameterTypes) {
+            params.add(param.getName());
+        }
+        ctorArgs[2] = Joiner.on(",").join(params);
+        
+        //TODO need to allow them to define such a function so it can be cached etc (esp. if they reuse)
+        mFuncSpec = new FuncSpec(InvokerGenerator.class.getName(), ctorArgs);
+        lazilyInitializeInvokerFunction = false;
+    }
+
+
+    //TODO need to fix this to use the updated code, it currently won't copy properly if called before it's done (maybe that's ok?)
     @Override
     public LogicalExpression deepCopy(LogicalExpressionPlan lgExpPlan) throws FrontendException {
         UserFuncExpression copy =  null;

Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Fri Apr 19 11:19:30 2013
@@ -367,7 +367,7 @@ cond : ^( OR cond cond )
 in_eval: ^( IN expr expr+ )
 ;
 
-func_eval: ^( FUNC_EVAL func_name real_arg* )
+func_eval: ^( FUNC_EVAL func_name real_arg* ) | ^( INVOKER_FUNC_EVAL func_name IDENTIFIER real_arg* )
 ;
 
 real_arg : expr | STAR | col_range
@@ -593,7 +593,7 @@ split_branch
    }
 ;
 
-split_otherwise 	: ^( OTHERWISE alias )
+split_otherwise : ^( OTHERWISE alias )
    {
        aliases.add( $alias.name );
    }

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Fri Apr 19 11:19:30 2013
@@ -19,6 +19,7 @@
 package org.apache.pig.parser;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.net.MalformedURLException;
@@ -28,6 +29,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.antlr.runtime.IntStream;
 import org.antlr.runtime.RecognitionException;
@@ -39,6 +42,7 @@ import org.apache.pig.StoreFuncInterface
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.builtin.CubeDimensions;
+import org.apache.pig.builtin.InvokerGenerator;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.builtin.RANDOM;
 import org.apache.pig.builtin.RollupDimensions;
@@ -50,6 +54,8 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.streaming.StreamingCommand;
@@ -97,6 +103,8 @@ import org.apache.pig.newplan.logical.ru
 import org.apache.pig.newplan.logical.visitor.ProjStarInUdfExpander;
 import org.apache.pig.newplan.logical.visitor.ProjectStarExpander;
 
+import com.google.common.collect.Lists;
+
 public class LogicalPlanBuilder {
 
     private LogicalPlan plan = new LogicalPlan();
@@ -1409,6 +1417,34 @@ public class LogicalPlanBuilder {
 
     }
 
+    LogicalExpression buildInvokerUDF(SourceLocation loc, LogicalExpressionPlan plan, String packageName, String funcName, boolean isStatic, List<LogicalExpression> args) throws RecognitionException {
+        LogicalExpression le = new UserFuncExpression(plan, new FuncSpec(InvokerGenerator.class.getName()), args, false, true, isStatic, packageName, funcName);
+        le.setLocation(loc);
+        return le;
+    }
+
+    public static Class<?> typeToClass(Class<?> clazz) {
+        if (clazz == Integer.TYPE) {
+            return Integer.class;
+        } else if (clazz == Long.TYPE) {
+            return Long.class;
+        } else if (clazz == Float.TYPE) {
+            return Long.class;
+        } else if (clazz == Double.TYPE) {
+            return Long.class;
+    	} else if (clazz == Boolean.TYPE) {
+            return Long.class;
+    	} else if (clazz == Short.TYPE) {
+            return Short.class;
+        } else if (clazz == Byte.TYPE) {
+            return Byte.class;
+        } else if (clazz == Character.TYPE) {
+            return Character.class;
+        } else {
+            throw new RuntimeException("Was not given a primitive TYPE class: " + clazz);
+        }
+    }
+
     LogicalExpression buildUDF(SourceLocation loc, LogicalExpressionPlan plan,
             String funcName, List<LogicalExpression> args)
     throws RecognitionException {

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Fri Apr 19 11:19:30 2013
@@ -801,6 +801,11 @@ func_eval[LogicalExpressionPlan plan] re
        SourceLocation loc = new SourceLocation( (PigParserNode)$func_name.start );
        $expr = builder.buildUDF( loc, $plan, $func_name.funcName, args );
    }
+ | ^( INVOKER_FUNC_EVAL package_name=IDENTIFIER function_name=IDENTIFIER is_static=IDENTIFIER ( real_arg[$plan] { args.add( $real_arg.expr ); } )* )
+   {
+       SourceLocation loc = new SourceLocation( (PigParserNode)$function_name );
+       $expr = builder.buildInvokerUDF( loc, $plan, $package_name.text, $function_name.text, Boolean.parseBoolean($is_static.text), args );
+   }
 ;
 
 real_arg [LogicalExpressionPlan plan] returns[LogicalExpression expr]

Modified: pig/trunk/src/org/apache/pig/parser/QueryLexer.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryLexer.g?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryLexer.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryLexer.g Fri Apr 19 11:19:30 2013
@@ -96,6 +96,9 @@ CUBE    : 'CUBE'
 ROLLUP	: 'ROLLUP'
 ;
 
+INVOKE  : 'INVOKE'
+;
+
 DISTINCT : 'DISTINCT'
 ;
 
@@ -469,5 +472,8 @@ QMARK : '?'
 ARROBA : '@'
 ;
 
+AMPERSAND : '&'
+;
+
 FAT_ARROW : '=>'
 ;

Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Fri Apr 19 11:19:30 2013
@@ -39,6 +39,8 @@ tokens {
     FUNC;
     FUNC_REF;
     FUNC_EVAL;
+    INVOKE;
+    INVOKER_FUNC_EVAL;
     CAST_EXPR;
     COL_RANGE;
     BIN_EXPR;
@@ -90,6 +92,8 @@ import org.apache.pig.parser.PigMacro;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.base.Joiner;
 }
 
 @members {
@@ -717,6 +721,7 @@ cast_expr
           // careful with periods straight after the identifier, as we want those to be projections, not function
           // calls
           | col_ref_without_identifier projection*
+          | invoker_func projection*
           | identifier_plus projection*
           | identifier_plus func_name_suffix? LEFT_PAREN ( real_arg ( COMMA real_arg )* )? RIGHT_PAREN projection* -> ^( FUNC_EVAL identifier_plus func_name_suffix? real_arg* ) projection*
           | func_name_without_columns LEFT_PAREN ( real_arg ( COMMA real_arg )* )? RIGHT_PAREN projection* -> ^( FUNC_EVAL func_name_without_columns real_arg* ) projection*
@@ -726,6 +731,16 @@ cast_expr
           | bracket_expr
 ;
 
+invoker_func
+@init {
+    String staticStr = "true";
+    List<String> packageStr = Lists.newArrayList();
+    String methodStr = null;
+}
+: INVOKE ( AMPERSAND | LEFT_PAREN real_arg { staticStr = "false"; } RIGHT_PAREN ) ( packageName=identifier_plus PERIOD { packageStr.add($packageName.text); } )* methodName=identifier_plus { methodStr=$methodName.text; } LEFT_PAREN ( real_arg ( COMMA real_arg )* )? RIGHT_PAREN
+              -> ^( INVOKER_FUNC_EVAL IDENTIFIER[Joiner.on(".").join(packageStr)] IDENTIFIER[methodStr] IDENTIFIER[staticStr] real_arg* )
+;
+
 // now we have to deal with parentheses: in an expr, '(' can be the
 // start of a cast, the start of a nested expression or the start of
 // a tuple. We'll ensure parsing is unambiguous by assuming a single
@@ -820,7 +835,7 @@ projection : PERIOD ( col_ref | LEFT_PAR
 // for disambiguation with func_names
 col_ref_without_identifier : GROUP | DOLLARVAR
 ;
- 
+
 col_ref : col_ref_without_identifier | identifier_plus
 ;
 

Added: pig/trunk/test/org/apache/pig/builtin/TestInvokerGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestInvokerGenerator.java?rev=1469783&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestInvokerGenerator.java (added)
+++ pig/trunk/test/org/apache/pig/builtin/TestInvokerGenerator.java Fri Apr 19 11:19:30 2013
@@ -0,0 +1,89 @@
+package org.apache.pig.builtin;
+
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.Utils;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TestInvokerGenerator {
+	private static PigServer pigServer;
+	private static Random r;
+	
+	@Before
+	public void setUp() throws Exception {
+		pigServer = new PigServer(ExecType.LOCAL);
+		r = new Random(42L);
+	}
+	
+	@Test
+	public void testConcat() throws Exception {
+	    Data data = resetData(pigServer);
+
+	    Set<Tuple> inputs = ImmutableSet.of(tuple("a"), tuple("b"), tuple("c"));
+	    Set<Tuple> expected = Sets.newHashSet();
+	    
+	    for (Tuple t : inputs) {
+	    	String str = (String)t.get(0);
+	    	expected.add(tuple(str.concat(str)));
+	    }
+	    
+	    data.set("foo", Utils.getSchemaFromString("x:chararray"), inputs);
+		
+		pigServer.registerQuery("define concat InvokerGenerator('java.lang.String','concat','String');");
+		pigServer.registerQuery("a = load 'foo' using mock.Storage();");
+		pigServer.registerQuery("b = foreach @ generate concat($0, $0);");
+		pigServer.registerQuery("store b into 'bar' using mock.Storage();");
+		
+		List<Tuple> results = data.get("bar");
+		assertEquals(expected.size(), results.size());
+		for (Tuple t : results) {
+			assertTrue(expected.remove(t));
+		}
+		assertEquals(0, expected.size());
+	}
+	
+	@Test
+	public void testValueOf() throws Exception {
+	    Data data = resetData(pigServer);
+
+	    Set<Tuple> inputs = Sets.newHashSet();
+	    while (inputs.size() < 1000) {
+	    	inputs.add(tuple(Integer.toString(r.nextInt())));
+	    }
+	    Set<Tuple> expected = Sets.newHashSet();
+	    
+	    for (Tuple t : inputs) {
+	    	String str = (String)t.get(0);
+	    	expected.add(tuple(Integer.valueOf(str)));
+	    }
+	    
+	    data.set("foo", Utils.getSchemaFromString("x:chararray"), inputs);
+		
+		pigServer.registerQuery("define valueOf InvokerGenerator('java.lang.Integer','valueOf','String');");
+		pigServer.registerQuery("a = load 'foo' using mock.Storage();");
+		pigServer.registerQuery("b = foreach @ generate valueOf($0);");
+		pigServer.registerQuery("store b into 'bar' using mock.Storage();");
+		
+		List<Tuple> results = data.get("bar");
+		assertEquals(expected.size(), results.size());
+		for (Tuple t : results) {
+			assertTrue(expected.remove(t));
+		}
+		assertEquals(0, expected.size());		
+	}
+}
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/test/TestBuiltinInvoker.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBuiltinInvoker.java?rev=1469783&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBuiltinInvoker.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestBuiltinInvoker.java Fri Apr 19 11:19:30 2013
@@ -0,0 +1,128 @@
+package org.apache.pig.test;
+
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.Utils;
+import org.junit.Before;
+import org.junit.Test;
+import org.python.google.common.collect.Sets;
+
+import com.google.common.collect.ImmutableSet;
+
+public class TestBuiltinInvoker {
+	private static PigServer pigServer;
+	private static Data data;
+	private static Set<Tuple> chardata = ImmutableSet.of(tuple("a"),tuple("b"),tuple("c"));
+	private static Set<Tuple> intdata = ImmutableSet.of(tuple(1),tuple(2),tuple(3));
+	private static Set<Tuple> charintdata = ImmutableSet.of(tuple("1"),tuple("2"),tuple("3"));
+	private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+	private static Random r;
+	
+	@Before
+	public void setUp() throws Exception {
+		pigServer = new PigServer(ExecType.LOCAL);
+		
+		data = resetData(pigServer);
+
+	    data.set("chardata", Utils.getSchemaFromString("x:chararray"), chardata);
+	    data.set("charintdata", Utils.getSchemaFromString("x:chararray"), charintdata);
+	    
+	    r = new Random(42L);
+	}
+	
+	@Test
+	public void testConcat() throws Exception {
+		Set<Tuple> expected = Sets.newHashSet();
+		for (Tuple t : chardata) {
+			String str = (String)t.get(0);
+			expected.add(mTupleFactory.newTuple(str + str));
+		}
+		
+		pigServer.registerQuery("a = load 'chardata' using mock.Storage();");
+		pigServer.registerQuery("b = foreach @ generate invoke(x)concat(x);");
+		pigServer.registerQuery("store b into 'res' using mock.Storage();");
+		List<Tuple> results = data.get("res");
+		dataIsEqual(expected, results);
+	}
+	
+	@Test
+	public void testValueOf() throws Exception {
+		Set<Tuple> expected = Sets.newHashSet();
+		for (Tuple t : charintdata) {
+			String str = (String)t.get(0);
+			expected.add(mTupleFactory.newTuple(Integer.valueOf(str)));
+		}
+		
+		pigServer.registerQuery("a = load 'charintdata' using mock.Storage();");
+		pigServer.registerQuery("b = foreach @ generate invoke&Integer.valueOf(x);");
+		pigServer.registerQuery("store b into 'res' using mock.Storage();");
+		List<Tuple> results = data.get("res");
+		dataIsEqual(expected, results);
+	}
+	
+	@Test
+	public void testManyConcat() throws Exception {
+		Set<Tuple> expected = Sets.newHashSet();
+		for (Tuple t : chardata) {
+			String str = (String)t.get(0);
+			expected.add(mTupleFactory.newTuple(str + str + str + str));
+		}
+		
+		pigServer.registerQuery("a = load 'chardata' using mock.Storage();");
+		pigServer.registerQuery("b = foreach @ generate invoke(invoke(invoke(x)concat(x))concat(x))concat(x);");
+		pigServer.registerQuery("store b into 'res' using mock.Storage();");
+		List<Tuple> results = data.get("res");
+		dataIsEqual(expected, results);
+	}
+	
+	@Test
+	public void testTupleSize() throws Exception {
+		pigServer.registerQuery("a = load 'chardata' using mock.Storage();");
+		pigServer.registerQuery("b = foreach @ generate invoke(TOTUPLE(x))size();");
+		pigServer.registerQuery("store b into 'res' using mock.Storage();");
+		List<Tuple> results = data.get("res");
+		assertEquals(3, results.size());
+		for (Tuple t : results) {
+			assertEquals(Integer.valueOf(1), (Integer)t.get(0));	
+		}
+	}
+	
+	@Test
+	public void testStringSize() throws Exception {
+		Set<Tuple> input = Sets.newHashSet();
+		Set<Tuple> expected = Sets.newHashSet();
+		for (int i = 0; i < 1000; i++) {
+			String val = Integer.toString(r.nextInt());
+			input.add(tuple(val));
+			expected.add(tuple(val, val.length()));
+		}
+		data.set("foo", Utils.getSchemaFromString("x:chararray"), input);
+		
+		pigServer.registerQuery("a = load 'foo' using mock.Storage();");
+		pigServer.registerQuery("b = foreach @ generate $0, invoke($0)length();");
+		pigServer.registerQuery("store b into 'bar' using mock.Storage();");
+		
+		dataIsEqual(expected, data.get("bar"));
+	}
+	
+	private void dataIsEqual(Set<Tuple> expected, Collection<Tuple> results) {
+		assertEquals(expected.size(), results.size());
+		for (Tuple t : results) {
+			assertTrue(expected.remove(t));
+		}
+		assertEquals(0, expected.size());
+	}
+}