You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2013/04/19 13:19:31 UTC
svn commit: r1469783 - in /pig/trunk: ./ ivy/ src/org/apache/pig/builtin/
src/org/apache/pig/data/ src/org/apache/pig/impl/
src/org/apache/pig/newplan/logical/expression/ src/org/apache/pig/parser/
test/org/apache/pig/builtin/ test/org/apache/pig/test/
Author: jcoveney
Date: Fri Apr 19 11:19:30 2013
New Revision: 1469783
URL: http://svn.apache.org/r1469783
Log:
PIG-3198: Let users use any function from PigType -> PigType as if it were builtlin (jcoveney)
Added:
pig/trunk/src/org/apache/pig/builtin/InvokerFunction.java
pig/trunk/src/org/apache/pig/builtin/InvokerGenerator.java
pig/trunk/test/org/apache/pig/builtin/TestInvokerGenerator.java
pig/trunk/test/org/apache/pig/test/TestBuiltinInvoker.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/build.xml
pig/trunk/ivy.xml
pig/trunk/ivy/libraries.properties
pig/trunk/src/org/apache/pig/data/DataType.java
pig/trunk/src/org/apache/pig/impl/PigContext.java
pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
pig/trunk/src/org/apache/pig/parser/AstValidator.g
pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
pig/trunk/src/org/apache/pig/parser/QueryLexer.g
pig/trunk/src/org/apache/pig/parser/QueryParser.g
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Apr 19 11:19:30 2013
@@ -28,6 +28,8 @@ PIG-3174: Remove rpm and deb artifacts
IMPROVEMENTS
+PIG-3198: Let users use any function from PigType -> PigType as if it were builtlin (jcoveney)
+
PIG-3268: Case statement support (cheolsoo)
PIG-3269: In operator support (cheolsoo)
Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Fri Apr 19 11:19:30 2013
@@ -329,6 +329,7 @@
<include name="guava-${guava.version}.jar"/>
<include name="automaton-${automaton.version}.jar"/>
<include name="jansi-${jansi.version}.jar"/>
+ <include name="asm*.jar"/>
</patternset>
</fileset>
@@ -346,6 +347,7 @@
<include name="log4j*.jar"/>
<include name="slf4j*.jar"/>
<include name="jsp-api*.jar"/>
+ <include name="asm*.jar"/>
</patternset>
</fileset>
Modified: pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Fri Apr 19 11:19:30 2013
@@ -218,6 +218,7 @@
<dependency org="dk.brics.automaton" name="automaton" rev="1.11-8" conf="compile->default"/>
<dependency org="org.jruby" name="jruby-complete" rev="${jruby.version}" conf="compile->master"/>
+ <dependency org="asm" name="asm" rev="${asm.version}" conf="compile->default"/>
<dependency org="org.apache.hbase" name="hbase" rev="${hbase.version}" conf="compile->master">
<artifact name="hbase" type="jar"/>
Modified: pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Fri Apr 19 11:19:30 2013
@@ -85,3 +85,4 @@ aopalliance.version=1.0
jsr311-api.version=1.1.1
mockito.version=1.8.4
jansi.version=1.9
+asm.version=3.3.1
Added: pig/trunk/src/org/apache/pig/builtin/InvokerFunction.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/InvokerFunction.java?rev=1469783&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/InvokerFunction.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/InvokerFunction.java Fri Apr 19 11:19:30 2013
@@ -0,0 +1,9 @@
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+
+import org.apache.pig.data.Tuple;
+
+public interface InvokerFunction {
+ public Object eval(Tuple input) throws IOException;
+}
Added: pig/trunk/src/org/apache/pig/builtin/InvokerGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/InvokerGenerator.java?rev=1469783&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/InvokerGenerator.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/InvokerGenerator.java Fri Apr 19 11:19:30 2013
@@ -0,0 +1,354 @@
+package org.apache.pig.builtin;
+
+import static org.objectweb.asm.Opcodes.ACC_PUBLIC;
+import static org.objectweb.asm.Opcodes.ACC_SUPER;
+import static org.objectweb.asm.Opcodes.ALOAD;
+import static org.objectweb.asm.Opcodes.ARETURN;
+import static org.objectweb.asm.Opcodes.ASTORE;
+import static org.objectweb.asm.Opcodes.CHECKCAST;
+import static org.objectweb.asm.Opcodes.ICONST_0;
+import static org.objectweb.asm.Opcodes.ICONST_1;
+import static org.objectweb.asm.Opcodes.ICONST_2;
+import static org.objectweb.asm.Opcodes.ICONST_3;
+import static org.objectweb.asm.Opcodes.ICONST_4;
+import static org.objectweb.asm.Opcodes.ICONST_5;
+import static org.objectweb.asm.Opcodes.INVOKEINTERFACE;
+import static org.objectweb.asm.Opcodes.INVOKESPECIAL;
+import static org.objectweb.asm.Opcodes.INVOKESTATIC;
+import static org.objectweb.asm.Opcodes.INVOKEVIRTUAL;
+import static org.objectweb.asm.Opcodes.RETURN;
+import static org.objectweb.asm.Opcodes.V1_6;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.objectweb.asm.ClassWriter;
+import org.objectweb.asm.MethodVisitor;
+import org.python.google.common.collect.Maps;
+
+import com.google.common.collect.Sets;
+
+//TODO need to add support for ANY Pig type!
+//TODO statically cache the generated code based on the input Strings
+public class InvokerGenerator extends EvalFunc<Object> {
+ private String className_;
+ private String methodName_;
+ private String[] argumentTypes_;
+
+ private boolean isInitialized = false;
+
+ private InvokerFunction generatedFunction;
+ private Schema outputSchema;
+
+ private static int uniqueId = 0;
+
+ private static final Map<Class<?>, Byte> returnTypeMap = new HashMap<Class<?>, Byte>() {{
+ put(String.class, DataType.CHARARRAY);
+ put(Integer.class, DataType.INTEGER);
+ put(Long.class, DataType.LONG);
+ put(Float.class, DataType.FLOAT);
+ put(Double.class, DataType.DOUBLE);
+ put(Boolean.class, DataType.BOOLEAN);
+ //put(byte[].class, DataType.BYTEARRAY);
+ put(Integer.TYPE, DataType.INTEGER);
+ put(Long.TYPE, DataType.LONG);
+ put(Float.TYPE, DataType.FLOAT);
+ put(Double.TYPE, DataType.DOUBLE);
+ put(Boolean.TYPE, DataType.BOOLEAN);
+
+ }};
+
+ private static final Map<Class<?>, Class<?>> inverseTypeMap = new HashMap<Class<?>, Class<?>>() {{
+ put(Integer.class, Integer.TYPE);
+ put(Long.class, Long.TYPE);
+ put(Float.class, Float.TYPE);
+ put(Double.class, Double.TYPE);
+ put(Boolean.class, Boolean.TYPE);
+ put(Integer.TYPE, Integer.class);
+ put(Long.TYPE, Long.class);
+ put(Float.TYPE, Float.class);
+ put(Double.TYPE, Double.class);
+ put(Boolean.TYPE, Boolean.class);
+ }};
+
+ private static final Map<Class<?>, String> primitiveSignature = new HashMap<Class<?>, String>() {{
+ put(Integer.TYPE, "I");
+ put(Long.TYPE, "J");
+ put(Float.TYPE, "F");
+ put(Double.TYPE, "D");
+ put(Boolean.TYPE, "Z");
+ }};
+
+ private static final Map<String,Class<?>> nameToClassObjectMap = new HashMap<String,Class<?>>() {{
+ put("String",String.class);
+ put("Integer", Integer.class);
+ put("int", Integer.TYPE);
+ put("Long", Long.class);
+ put("long", Long.TYPE);
+ put("Float", Float.class);
+ put("float", Float.TYPE);
+ put("Double", Double.class);
+ put("double", Double.TYPE);
+ put("Boolean", Boolean.class);
+ put("boolean", Boolean.TYPE);
+ //put("byte[]", byte[].class);
+ put("java.lang.String",String.class);
+ put("java.lang.Integer", Integer.class);
+ put("java.lang.Long", Long.class);
+ put("java.lang.Float", Float.class);
+ put("java.lang.Double", Double.class);
+ put("java.lang.Boolean", Boolean.class);
+ }};
+
+ public InvokerGenerator(String className, String methodName, String argumentTypes) {
+ className_ = className;
+ methodName_ = methodName;
+ argumentTypes_ = argumentTypes.split(",");
+ if ("".equals(argumentTypes)) {
+ argumentTypes_ = new String[0];
+ }
+ }
+
+ @Override
+ public Object exec(Tuple input) throws IOException {
+ if (!isInitialized)
+ initialize();
+
+ return generatedFunction.eval(input);
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ if (!isInitialized)
+ initialize();
+
+ return outputSchema;
+ }
+
+ private static int getUniqueId() {
+ return uniqueId++;
+ }
+
+ protected void initialize() {
+ Class<?> clazz;
+ try {
+ clazz = PigContext.resolveClassName(className_); //TODO I should probably be using this for all of the Class<?> resolution
+ } catch (IOException e) {
+ throw new RuntimeException("Given className not found: " + className_, e);
+ }
+
+ Class<?>[] arguments = getArgumentClassArray(argumentTypes_);
+
+ Method method;
+ try {
+ method = clazz.getMethod(methodName_, arguments); //must match exactly
+ } catch (SecurityException e) {
+ throw new RuntimeException("Not allowed to call given method["+methodName_+"] on class ["+className_+"] with arguments: " + Arrays.toString(argumentTypes_), e);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("Given method name ["+methodName_+"] does not exist on class ["+className_+"] with arguments: " + Arrays.toString(argumentTypes_), e);
+ }
+ boolean isStatic = Modifier.isStatic(method.getModifiers());
+
+ Class<?> returnClazz = method.getReturnType();
+ Byte type;
+ if (returnClazz.isPrimitive()) {
+ type = returnTypeMap.get(inverseTypeMap.get(returnClazz));
+ } else {
+ type = returnTypeMap.get(returnClazz);
+ }
+
+ //TODO add functionality so that if the user pairs this witha cast that it will let you return object
+ if (type == null) {
+ throw new RuntimeException("Function returns invalid type: " + returnClazz);
+ }
+
+ outputSchema = new Schema();
+ outputSchema.add(new Schema.FieldSchema(null, type));
+
+ generatedFunction = generateInvokerFunction("InvokerFunction_"+getUniqueId(), method, isStatic, arguments);
+
+ isInitialized = true;
+ }
+
+ private Class<?>[] getArgumentClassArray(String[] argumentTypes) {
+ Class<?>[] arguments = new Class<?>[argumentTypes.length];
+ for (int i = 0; i < argumentTypes.length; i++) {
+ try {
+ arguments[i]= nameToClassObjectMap.get(argumentTypes[i]);
+ if (arguments[i] == null) {
+ arguments[i] = PigContext.resolveClassName(argumentTypes[i]);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to find class in PigContext: " + argumentTypes[i], e);
+ }
+ }
+ return arguments;
+ }
+
+ private InvokerFunction generateInvokerFunction(String className, Method method, boolean isStatic, Class<?>[] arguments) {
+ byte[] byteCode = generateInvokerFunctionBytecode(className, method, isStatic, arguments);
+
+ return ByteClassLoader.getInvokerFunction(className, byteCode);
+ }
+
+ private byte[] generateInvokerFunctionBytecode(String className, Method method, boolean isStatic, Class<?>[] arguments) {
+ boolean isInterface = method.getDeclaringClass().isInterface();
+
+ ClassWriter cw = new ClassWriter(0);
+ cw.visit(V1_6, ACC_PUBLIC + ACC_SUPER, className, null, "java/lang/Object", new String[] { "org/apache/pig/builtin/InvokerFunction" });
+
+ makeConstructor(cw);
+
+ MethodVisitor mv = cw.visitMethod(ACC_PUBLIC, "eval", "(Lorg/apache/pig/data/Tuple;)Ljava/lang/Object;", null, new String[] { "java/io/IOException" });
+ mv.visitCode();
+
+ int next = 2;
+ //this will get the arguments from the Tuple, cast them, and astore them
+ int begin = 0;
+ if (!isStatic)
+ loadAndStoreArgument(mv, begin++, next++, getMethodStyleName(method.getDeclaringClass()));
+
+ for (int i = 0; i < arguments.length; i++)
+ loadAndStoreArgument(mv, i + begin, next++, getMethodStyleName(getObjectVersion(arguments[i])));
+
+ //puts the arguments on the stack
+ next = 2;
+
+ if (!isStatic) {
+ mv.visitVarInsn(ALOAD, next++); //put the method receiver on the stack
+ }
+
+ for (Class<?> arg : arguments) {
+ mv.visitVarInsn(ALOAD, next++);
+ unboxIfPrimitive(mv, arg);
+ }
+ String signature = buildSignatureString(arguments, method.getReturnType());
+ mv.visitMethodInsn(isStatic ? INVOKESTATIC : isInterface ? INVOKEINTERFACE : INVOKEVIRTUAL, getMethodStyleName(method.getDeclaringClass()), method.getName(), signature);
+ boxIfPrimitive(mv, method.getReturnType()); //TODO does this work?
+ mv.visitInsn(ARETURN);
+ mv.visitMaxs(2, (isStatic ? 2 : 3) + arguments.length);
+ mv.visitEnd();
+
+ cw.visitEnd();
+
+ return cw.toByteArray();
+ }
+
+ private String buildSignatureString(Class<?>[] arguments, Class<?> returnClazz) {
+ String sig = "(";
+ for (Class<?> arg : arguments) {
+ if (!arg.isPrimitive())
+ sig += "L" + getMethodStyleName(arg) + ";";
+ else
+ sig += getMethodStyleName(arg);
+ }
+ sig += ")";
+
+ if (!returnClazz.isPrimitive()) {
+ sig += "L" + getMethodStyleName(returnClazz) + ";";
+ } else {
+ sig += getMethodStyleName(returnClazz);
+ }
+ return sig;
+
+ }
+
+ private Class<?> getObjectVersion(Class<?> clazz) {
+ if (clazz.isPrimitive()) {
+ return inverseTypeMap.get(clazz);
+ }
+ return clazz;
+
+ }
+
+ private String getMethodStyleName(Class<?> clazz) {
+ if (!clazz.isPrimitive()) {
+ return clazz.getCanonicalName().replaceAll("\\.","/");
+ }
+ return primitiveSignature.get(clazz);
+ }
+
+
+ private void boxIfPrimitive(MethodVisitor mv, Class<?> clazz) {
+ if (!clazz.isPrimitive()) {
+ return;
+ }
+ String boxedClass = getMethodStyleName(inverseTypeMap.get(clazz));
+ mv.visitMethodInsn(INVOKESTATIC, boxedClass, "valueOf", "("+getMethodStyleName(clazz)+")L"+boxedClass+";");
+ }
+
+ private void unboxIfPrimitive(MethodVisitor mv, Class<?> clazz) {
+ if (!clazz.isPrimitive()) {
+ return;
+ }
+ String methodName = clazz.getSimpleName()+"Value";
+ mv.visitMethodInsn(INVOKEVIRTUAL, getMethodStyleName(inverseTypeMap.get(clazz)), methodName, "()"+getMethodStyleName(clazz));
+ }
+
+ private void loadAndStoreArgument(MethodVisitor mv, int loadIdx, int storeIdx, String castName) {
+ mv.visitVarInsn(ALOAD, 1); //loads the 1st argument
+ addConst(mv, loadIdx);
+ mv.visitMethodInsn(INVOKEINTERFACE, "org/apache/pig/data/Tuple", "get", "(I)Ljava/lang/Object;");
+ mv.visitTypeInsn(CHECKCAST, castName);
+ mv.visitVarInsn(ASTORE, storeIdx);
+ }
+
+ private void addConst(MethodVisitor mv, int idx) {
+ switch (idx) {
+ case(0): mv.visitInsn(ICONST_0); break;
+ case(1): mv.visitInsn(ICONST_1); break;
+ case(2): mv.visitInsn(ICONST_2); break;
+ case(3): mv.visitInsn(ICONST_3); break;
+ case(4): mv.visitInsn(ICONST_4); break;
+ case(5): mv.visitInsn(ICONST_5); break;
+ default:
+ throw new RuntimeException("Invalid index given to addConst: " + idx);
+ }
+ }
+
+ private void makeConstructor(ClassWriter cw) {
+ MethodVisitor mv = cw.visitMethod(ACC_PUBLIC, "<init>", "()V", null, null);
+ mv.visitCode();
+ mv.visitVarInsn(ALOAD, 0);
+ mv.visitMethodInsn(INVOKESPECIAL, "java/lang/Object", "<init>", "()V");
+ mv.visitInsn(RETURN);
+ mv.visitMaxs(1, 1);
+ mv.visitEnd();
+ }
+
+ static class ByteClassLoader extends ClassLoader {
+ private byte[] buf;
+
+ public ByteClassLoader(byte[] buf) {
+ this.buf = buf;
+ }
+
+ public Class<InvokerFunction> findClass(String name) {
+ return (Class<InvokerFunction>)defineClass(name, buf, 0, buf.length);
+ }
+
+ public static InvokerFunction getInvokerFunction(String name, byte[] buf) {
+ try {
+ return new ByteClassLoader(buf).findClass(name).newInstance();
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
Modified: pig/trunk/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DataType.java?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DataType.java (original)
+++ pig/trunk/src/org/apache/pig/data/DataType.java Fri Apr 19 11:19:30 2013
@@ -324,6 +324,30 @@ public class DataType {
default: return "Unknown";
}
}
+
+ public static Class<?> findTypeClass(byte dt) {
+ switch (dt) {
+ case NULL: return Void.TYPE;
+ case BOOLEAN: return Boolean.TYPE;
+ case BYTE: return Byte.TYPE;
+ case INTEGER: return Integer.TYPE;
+ case BIGINTEGER: return BigInteger.class;
+ case BIGDECIMAL: return BigDecimal.class;
+ case LONG: return Long.TYPE;
+ case FLOAT: return Float.TYPE;
+ case DOUBLE: return Double.TYPE;
+ case DATETIME: return DateTime.class;
+ case BYTEARRAY: return DataByteArray.class;
+ case BIGCHARARRAY: return String.class;
+ case CHARARRAY: return String.class;
+ case MAP: return Map.class;
+ case INTERNALMAP: return InternalMap.class;
+ case TUPLE: return Tuple.class;
+ case BAG: return DataBag.class;
+ case GENERIC_WRITABLECOMPARABLE: return WritableComparable.class;
+ default: throw new RuntimeException("Invalid type has no corresponding class: " + dt);
+ }
+ }
/**
* Determine whether the this data type is complex.
Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Fri Apr 19 11:19:30 2013
@@ -739,6 +739,7 @@ public class PigContext implements Seria
if (packageImportList.get() == null) {
ArrayList<String> importlist = new ArrayList<String>();
importlist.add("");
+ importlist.add("java.lang.");
importlist.add("org.apache.pig.builtin.");
importlist.add("org.apache.pig.impl.builtin.");
packageImportList.set(importlist);
Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java Fri Apr 19 11:19:30 2013
@@ -19,12 +19,16 @@
package org.apache.pig.newplan.logical.expression;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
+import org.apache.pig.builtin.InvokerGenerator;
import org.apache.pig.builtin.Nondeterministic;
import org.apache.pig.data.DataType;
import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
@@ -38,7 +42,12 @@ import org.apache.pig.newplan.OperatorPl
import org.apache.pig.newplan.PlanVisitor;
import org.apache.pig.newplan.logical.Util;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.parser.LogicalPlanBuilder;
import org.apache.pig.parser.SourceLocation;
+import org.python.google.common.base.Joiner;
+
+import com.google.common.collect.Lists;
public class UserFuncExpression extends LogicalExpression {
@@ -75,6 +84,21 @@ public class UserFuncExpression extends
this( plan, funcSpec, args );
this.viaDefine = viaDefine;
}
+
+ private boolean lazilyInitializeInvokerFunction = false;
+ private List<LogicalExpression> saveArgsForLater = null;
+ private boolean invokerIsStatic = false;
+ private String funcName = null;
+ private String packageName = null;
+
+ public UserFuncExpression(OperatorPlan plan, FuncSpec funcSpec, List<LogicalExpression> args, boolean viaDefine, boolean lazilyInitializeInvokerFunction, boolean invokerIsStatic, String packageName, String funcName) {
+ this( plan, funcSpec, args, viaDefine );
+ this.saveArgsForLater = args;
+ this.lazilyInitializeInvokerFunction = lazilyInitializeInvokerFunction;
+ this.packageName = packageName;
+ this.funcName = funcName;
+ this.invokerIsStatic = invokerIsStatic;
+ }
public FuncSpec getFuncSpec() {
return mFuncSpec;
@@ -168,7 +192,7 @@ public class UserFuncExpression extends
mFuncSpec = funcSpec;
ef = (EvalFunc<?>) PigContext.instantiateFuncFromSpec(mFuncSpec);
}
-
+
@Override
public LogicalSchema.LogicalFieldSchema getFieldSchema() throws FrontendException {
if (fieldSchema!=null)
@@ -187,16 +211,21 @@ public class UserFuncExpression extends
}
}
+ if (lazilyInitializeInvokerFunction) {
+ initializeInvokerFunction();
+ }
+
// Since ef only set one time, we never change its value, so we can optimize it by instantiate only once.
// This significantly optimize the performance of frontend (PIG-1738)
- if (ef==null)
+ if (ef==null) {
ef = (EvalFunc<?>) PigContext.instantiateFuncFromSpec(mFuncSpec);
+ }
ef.setUDFContextSignature(signature);
Properties props = UDFContext.getUDFContext().getUDFProperties(ef.getClass());
Schema translatedInputSchema = Util.translateSchema(inputSchema);
if(translatedInputSchema != null) {
- props.put("pig.evalfunc.inputschema."+signature, translatedInputSchema);
+ props.put("pig.evalfunc.inputschema."+signature, translatedInputSchema);
}
// Store inputSchema into the UDF context
ef.setInputSchema(translatedInputSchema);
@@ -228,6 +257,100 @@ public class UserFuncExpression extends
return fieldSchema;
}
+ private void initializeInvokerFunction() {
+ List<LogicalFieldSchema> fieldSchemas = Lists.newArrayList();
+ for (LogicalExpression le : saveArgsForLater) {
+ try {
+ fieldSchemas.add(le.getFieldSchema());
+ } catch (FrontendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ Class<?> funcClass;
+
+ if (invokerIsStatic) {
+ try {
+ funcClass = PigContext.resolveClassName(packageName);
+ } catch (IOException e) {
+ throw new RuntimeException("Invoker function name not found: " + packageName, e);
+ }
+ } else {
+ funcClass = DataType.findTypeClass(fieldSchemas.get(0).type);
+ if (funcClass.isPrimitive()) {
+ funcClass = LogicalPlanBuilder.typeToClass(funcClass);
+ }
+ }
+
+ Class<?>[] parameterTypes = new Class<?>[fieldSchemas.size() - (invokerIsStatic ? 0 : 1)];
+ int idx = 0;
+ for (int i = invokerIsStatic ? 0 : 1; i < fieldSchemas.size(); i++) {
+ parameterTypes[idx++] = DataType.findTypeClass(fieldSchemas.get(i).type);
+ }
+
+ List<Integer> primitiveParameters = Lists.newArrayList();
+
+ for (int i = 0; i < parameterTypes.length; i++) {
+ if (parameterTypes[i].isPrimitive()) {
+ primitiveParameters.add(i);
+ }
+ }
+
+ int tries = 1 << primitiveParameters.size();
+
+ Method m = null;
+
+ for (int i = 0; i < tries; i++) {
+ Class<?>[] tmpParameterTypes = new Class<?>[parameterTypes.length];
+ for (int j = 0; j < parameterTypes.length; j++) {
+ tmpParameterTypes[j] = parameterTypes[j];
+ }
+
+ int tmp = i;
+ int idx2 = 0;
+ while (tmp > 0) {
+ if (tmp % 2 == 1) {
+ int toFlip = primitiveParameters.get(idx2);
+ tmpParameterTypes[toFlip] = LogicalPlanBuilder.typeToClass(tmpParameterTypes[toFlip]);
+ }
+ tmp >>= 1;
+ idx2++;
+ }
+
+ try {
+ m = funcClass.getMethod(funcName, parameterTypes);
+ if (m != null) {
+ parameterTypes = tmpParameterTypes;
+ break;
+ }
+ } catch (SecurityException e) {
+ throw new RuntimeException("Not allowed to access method ["+funcName+"] on class: " + funcClass, e);
+ } catch (NoSuchMethodException e) {
+ // we just continue, as we are searching for a match post-boxing
+ }
+ }
+
+ if (m == null) {
+ throw new RuntimeException("Given method ["+funcName+"] does not exist on class: " + funcClass);
+ }
+
+ String[] ctorArgs = new String[3];
+ ctorArgs[0] = funcClass.getName();
+ ctorArgs[1] = funcName;
+ ctorArgs[2] = "";
+ List<String> params = Lists.newArrayList();
+ for (Class<?> param : parameterTypes) {
+ params.add(param.getName());
+ }
+ ctorArgs[2] = Joiner.on(",").join(params);
+
+ //TODO need to allow them to define such a function so it can be cached etc (esp. if they reuse)
+ mFuncSpec = new FuncSpec(InvokerGenerator.class.getName(), ctorArgs);
+ lazilyInitializeInvokerFunction = false;
+ }
+
+
+ //TODO need to fix this to use the updated code, it currently won't copy properly if called before it's done (maybe that's ok?)
@Override
public LogicalExpression deepCopy(LogicalExpressionPlan lgExpPlan) throws FrontendException {
UserFuncExpression copy = null;
Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Fri Apr 19 11:19:30 2013
@@ -367,7 +367,7 @@ cond : ^( OR cond cond )
in_eval: ^( IN expr expr+ )
;
-func_eval: ^( FUNC_EVAL func_name real_arg* )
+func_eval: ^( FUNC_EVAL func_name real_arg* ) | ^( INVOKER_FUNC_EVAL func_name IDENTIFIER real_arg* )
;
real_arg : expr | STAR | col_range
@@ -593,7 +593,7 @@ split_branch
}
;
-split_otherwise : ^( OTHERWISE alias )
+split_otherwise : ^( OTHERWISE alias )
{
aliases.add( $alias.name );
}
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Fri Apr 19 11:19:30 2013
@@ -19,6 +19,7 @@
package org.apache.pig.parser;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.MalformedURLException;
@@ -28,6 +29,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.antlr.runtime.IntStream;
import org.antlr.runtime.RecognitionException;
@@ -39,6 +42,7 @@ import org.apache.pig.StoreFuncInterface
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.builtin.CubeDimensions;
+import org.apache.pig.builtin.InvokerGenerator;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.builtin.RANDOM;
import org.apache.pig.builtin.RollupDimensions;
@@ -50,6 +54,8 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.streaming.StreamingCommand;
@@ -97,6 +103,8 @@ import org.apache.pig.newplan.logical.ru
import org.apache.pig.newplan.logical.visitor.ProjStarInUdfExpander;
import org.apache.pig.newplan.logical.visitor.ProjectStarExpander;
+import com.google.common.collect.Lists;
+
public class LogicalPlanBuilder {
private LogicalPlan plan = new LogicalPlan();
@@ -1409,6 +1417,34 @@ public class LogicalPlanBuilder {
}
+ LogicalExpression buildInvokerUDF(SourceLocation loc, LogicalExpressionPlan plan, String packageName, String funcName, boolean isStatic, List<LogicalExpression> args) throws RecognitionException {
+ LogicalExpression le = new UserFuncExpression(plan, new FuncSpec(InvokerGenerator.class.getName()), args, false, true, isStatic, packageName, funcName);
+ le.setLocation(loc);
+ return le;
+ }
+
+ public static Class<?> typeToClass(Class<?> clazz) {
+ if (clazz == Integer.TYPE) {
+ return Integer.class;
+ } else if (clazz == Long.TYPE) {
+ return Long.class;
+ } else if (clazz == Float.TYPE) {
+ return Long.class;
+ } else if (clazz == Double.TYPE) {
+ return Long.class;
+ } else if (clazz == Boolean.TYPE) {
+ return Long.class;
+ } else if (clazz == Short.TYPE) {
+ return Short.class;
+ } else if (clazz == Byte.TYPE) {
+ return Byte.class;
+ } else if (clazz == Character.TYPE) {
+ return Character.class;
+ } else {
+ throw new RuntimeException("Was not given a primitive TYPE class: " + clazz);
+ }
+ }
+
LogicalExpression buildUDF(SourceLocation loc, LogicalExpressionPlan plan,
String funcName, List<LogicalExpression> args)
throws RecognitionException {
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Fri Apr 19 11:19:30 2013
@@ -801,6 +801,11 @@ func_eval[LogicalExpressionPlan plan] re
SourceLocation loc = new SourceLocation( (PigParserNode)$func_name.start );
$expr = builder.buildUDF( loc, $plan, $func_name.funcName, args );
}
+ | ^( INVOKER_FUNC_EVAL package_name=IDENTIFIER function_name=IDENTIFIER is_static=IDENTIFIER ( real_arg[$plan] { args.add( $real_arg.expr ); } )* )
+ {
+ SourceLocation loc = new SourceLocation( (PigParserNode)$function_name );
+ $expr = builder.buildInvokerUDF( loc, $plan, $package_name.text, $function_name.text, Boolean.parseBoolean($is_static.text), args );
+ }
;
real_arg [LogicalExpressionPlan plan] returns[LogicalExpression expr]
Modified: pig/trunk/src/org/apache/pig/parser/QueryLexer.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryLexer.g?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryLexer.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryLexer.g Fri Apr 19 11:19:30 2013
@@ -96,6 +96,9 @@ CUBE : 'CUBE'
ROLLUP : 'ROLLUP'
;
+INVOKE : 'INVOKE'
+;
+
DISTINCT : 'DISTINCT'
;
@@ -469,5 +472,8 @@ QMARK : '?'
ARROBA : '@'
;
+AMPERSAND : '&'
+;
+
FAT_ARROW : '=>'
;
Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1469783&r1=1469782&r2=1469783&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Fri Apr 19 11:19:30 2013
@@ -39,6 +39,8 @@ tokens {
FUNC;
FUNC_REF;
FUNC_EVAL;
+ INVOKE;
+ INVOKER_FUNC_EVAL;
CAST_EXPR;
COL_RANGE;
BIN_EXPR;
@@ -90,6 +92,8 @@ import org.apache.pig.parser.PigMacro;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.base.Joiner;
}
@members {
@@ -717,6 +721,7 @@ cast_expr
// careful with periods straight after the identifier, as we want those to be projections, not function
// calls
| col_ref_without_identifier projection*
+ | invoker_func projection*
| identifier_plus projection*
| identifier_plus func_name_suffix? LEFT_PAREN ( real_arg ( COMMA real_arg )* )? RIGHT_PAREN projection* -> ^( FUNC_EVAL identifier_plus func_name_suffix? real_arg* ) projection*
| func_name_without_columns LEFT_PAREN ( real_arg ( COMMA real_arg )* )? RIGHT_PAREN projection* -> ^( FUNC_EVAL func_name_without_columns real_arg* ) projection*
@@ -726,6 +731,16 @@ cast_expr
| bracket_expr
;
+invoker_func
+@init {
+ String staticStr = "true";
+ List<String> packageStr = Lists.newArrayList();
+ String methodStr = null;
+}
+: INVOKE ( AMPERSAND | LEFT_PAREN real_arg { staticStr = "false"; } RIGHT_PAREN ) ( packageName=identifier_plus PERIOD { packageStr.add($packageName.text); } )* methodName=identifier_plus { methodStr=$methodName.text; } LEFT_PAREN ( real_arg ( COMMA real_arg )* )? RIGHT_PAREN
+ -> ^( INVOKER_FUNC_EVAL IDENTIFIER[Joiner.on(".").join(packageStr)] IDENTIFIER[methodStr] IDENTIFIER[staticStr] real_arg* )
+;
+
// now we have to deal with parentheses: in an expr, '(' can be the
// start of a cast, the start of a nested expression or the start of
// a tuple. We'll ensure parsing is unambiguous by assuming a single
@@ -820,7 +835,7 @@ projection : PERIOD ( col_ref | LEFT_PAR
// for disambiguation with func_names
col_ref_without_identifier : GROUP | DOLLARVAR
;
-
+
col_ref : col_ref_without_identifier | identifier_plus
;
Added: pig/trunk/test/org/apache/pig/builtin/TestInvokerGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestInvokerGenerator.java?rev=1469783&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestInvokerGenerator.java (added)
+++ pig/trunk/test/org/apache/pig/builtin/TestInvokerGenerator.java Fri Apr 19 11:19:30 2013
@@ -0,0 +1,89 @@
+package org.apache.pig.builtin;
+
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.Utils;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TestInvokerGenerator {
+ private static PigServer pigServer;
+ private static Random r;
+
+ @Before
+ public void setUp() throws Exception {
+ pigServer = new PigServer(ExecType.LOCAL);
+ r = new Random(42L);
+ }
+
+ @Test
+ public void testConcat() throws Exception {
+ Data data = resetData(pigServer);
+
+ Set<Tuple> inputs = ImmutableSet.of(tuple("a"), tuple("b"), tuple("c"));
+ Set<Tuple> expected = Sets.newHashSet();
+
+ for (Tuple t : inputs) {
+ String str = (String)t.get(0);
+ expected.add(tuple(str.concat(str)));
+ }
+
+ data.set("foo", Utils.getSchemaFromString("x:chararray"), inputs);
+
+ pigServer.registerQuery("define concat InvokerGenerator('java.lang.String','concat','String');");
+ pigServer.registerQuery("a = load 'foo' using mock.Storage();");
+ pigServer.registerQuery("b = foreach @ generate concat($0, $0);");
+ pigServer.registerQuery("store b into 'bar' using mock.Storage();");
+
+ List<Tuple> results = data.get("bar");
+ assertEquals(expected.size(), results.size());
+ for (Tuple t : results) {
+ assertTrue(expected.remove(t));
+ }
+ assertEquals(0, expected.size());
+ }
+
+ @Test
+ public void testValueOf() throws Exception {
+ Data data = resetData(pigServer);
+
+ Set<Tuple> inputs = Sets.newHashSet();
+ while (inputs.size() < 1000) {
+ inputs.add(tuple(Integer.toString(r.nextInt())));
+ }
+ Set<Tuple> expected = Sets.newHashSet();
+
+ for (Tuple t : inputs) {
+ String str = (String)t.get(0);
+ expected.add(tuple(Integer.valueOf(str)));
+ }
+
+ data.set("foo", Utils.getSchemaFromString("x:chararray"), inputs);
+
+ pigServer.registerQuery("define valueOf InvokerGenerator('java.lang.Integer','valueOf','String');");
+ pigServer.registerQuery("a = load 'foo' using mock.Storage();");
+ pigServer.registerQuery("b = foreach @ generate valueOf($0);");
+ pigServer.registerQuery("store b into 'bar' using mock.Storage();");
+
+ List<Tuple> results = data.get("bar");
+ assertEquals(expected.size(), results.size());
+ for (Tuple t : results) {
+ assertTrue(expected.remove(t));
+ }
+ assertEquals(0, expected.size());
+ }
+}
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/test/TestBuiltinInvoker.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBuiltinInvoker.java?rev=1469783&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBuiltinInvoker.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestBuiltinInvoker.java Fri Apr 19 11:19:30 2013
@@ -0,0 +1,128 @@
+package org.apache.pig.test;
+
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.Utils;
+import org.junit.Before;
+import org.junit.Test;
+import org.python.google.common.collect.Sets;
+
+import com.google.common.collect.ImmutableSet;
+
+public class TestBuiltinInvoker {
+ private static PigServer pigServer;
+ private static Data data;
+ private static Set<Tuple> chardata = ImmutableSet.of(tuple("a"),tuple("b"),tuple("c"));
+ private static Set<Tuple> intdata = ImmutableSet.of(tuple(1),tuple(2),tuple(3));
+ private static Set<Tuple> charintdata = ImmutableSet.of(tuple("1"),tuple("2"),tuple("3"));
+ private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+ private static Random r;
+
+ @Before
+ public void setUp() throws Exception {
+ pigServer = new PigServer(ExecType.LOCAL);
+
+ data = resetData(pigServer);
+
+ data.set("chardata", Utils.getSchemaFromString("x:chararray"), chardata);
+ data.set("charintdata", Utils.getSchemaFromString("x:chararray"), charintdata);
+
+ r = new Random(42L);
+ }
+
+ @Test
+ public void testConcat() throws Exception {
+ Set<Tuple> expected = Sets.newHashSet();
+ for (Tuple t : chardata) {
+ String str = (String)t.get(0);
+ expected.add(mTupleFactory.newTuple(str + str));
+ }
+
+ pigServer.registerQuery("a = load 'chardata' using mock.Storage();");
+ pigServer.registerQuery("b = foreach @ generate invoke(x)concat(x);");
+ pigServer.registerQuery("store b into 'res' using mock.Storage();");
+ List<Tuple> results = data.get("res");
+ dataIsEqual(expected, results);
+ }
+
+ @Test
+ public void testValueOf() throws Exception {
+ Set<Tuple> expected = Sets.newHashSet();
+ for (Tuple t : charintdata) {
+ String str = (String)t.get(0);
+ expected.add(mTupleFactory.newTuple(Integer.valueOf(str)));
+ }
+
+ pigServer.registerQuery("a = load 'charintdata' using mock.Storage();");
+ pigServer.registerQuery("b = foreach @ generate invoke&Integer.valueOf(x);");
+ pigServer.registerQuery("store b into 'res' using mock.Storage();");
+ List<Tuple> results = data.get("res");
+ dataIsEqual(expected, results);
+ }
+
+ @Test
+ public void testManyConcat() throws Exception {
+ Set<Tuple> expected = Sets.newHashSet();
+ for (Tuple t : chardata) {
+ String str = (String)t.get(0);
+ expected.add(mTupleFactory.newTuple(str + str + str + str));
+ }
+
+ pigServer.registerQuery("a = load 'chardata' using mock.Storage();");
+ pigServer.registerQuery("b = foreach @ generate invoke(invoke(invoke(x)concat(x))concat(x))concat(x);");
+ pigServer.registerQuery("store b into 'res' using mock.Storage();");
+ List<Tuple> results = data.get("res");
+ dataIsEqual(expected, results);
+ }
+
+ @Test
+ public void testTupleSize() throws Exception {
+ pigServer.registerQuery("a = load 'chardata' using mock.Storage();");
+ pigServer.registerQuery("b = foreach @ generate invoke(TOTUPLE(x))size();");
+ pigServer.registerQuery("store b into 'res' using mock.Storage();");
+ List<Tuple> results = data.get("res");
+ assertEquals(3, results.size());
+ for (Tuple t : results) {
+ assertEquals(Integer.valueOf(1), (Integer)t.get(0));
+ }
+ }
+
+ @Test
+ public void testStringSize() throws Exception {
+ Set<Tuple> input = Sets.newHashSet();
+ Set<Tuple> expected = Sets.newHashSet();
+ for (int i = 0; i < 1000; i++) {
+ String val = Integer.toString(r.nextInt());
+ input.add(tuple(val));
+ expected.add(tuple(val, val.length()));
+ }
+ data.set("foo", Utils.getSchemaFromString("x:chararray"), input);
+
+ pigServer.registerQuery("a = load 'foo' using mock.Storage();");
+ pigServer.registerQuery("b = foreach @ generate $0, invoke($0)length();");
+ pigServer.registerQuery("store b into 'bar' using mock.Storage();");
+
+ dataIsEqual(expected, data.get("bar"));
+ }
+
+ private void dataIsEqual(Set<Tuple> expected, Collection<Tuple> results) {
+ assertEquals(expected.size(), results.size());
+ for (Tuple t : results) {
+ assertTrue(expected.remove(t));
+ }
+ assertEquals(0, expected.size());
+ }
+}