You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/29 23:46:05 UTC
svn commit: r700270 - in /incubator/pig/branches/types: ./
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/validators/ src/org/apache/pig/pen/
test/org/apache/pig/test/
Author: olga
Date: Mon Sep 29 14:46:05 2008
New Revision: 700270
URL: http://svn.apache.org/viewvc?rev=700270&view=rev
Log:
PIG-427: casting input to UDF
Added:
incubator/pig/branches/types/test/org/apache/pig/test/TestBestFitCast.java
Modified:
incubator/pig/branches/types/CHANGES.txt
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
incubator/pig/branches/types/src/org/apache/pig/pen/EquivalenceClasses.java
Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=700270&r1=700269&r2=700270&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Mon Sep 29 14:46:05 2008
@@ -259,3 +259,5 @@
PIG-376: set job name (olgan)
PIG-463: POCast changes (pradeepk via olgan)
+
+ PIG-427: casting input to UDFs
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java?rev=700270&r1=700269&r2=700270&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java Mon Sep 29 14:46:05 2008
@@ -126,4 +126,8 @@
mFuncSpec = funcSpec;
}
+ public void setMArgs(List<ExpressionOperator> args) {
+ mArgs = args;
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=700270&r1=700269&r2=700270&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Mon Sep 29 14:46:05 2008
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -40,7 +41,6 @@
import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType ;
-import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.*;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.data.DataType ;
@@ -56,11 +56,46 @@
*/
public class TypeCheckingVisitor extends LOVisitor {
+ private static final int INF = -1;
+
private static final Log log = LogFactory.getLog(TypeCheckingVisitor.class);
private CompilationMessageCollector msgCollector = null ;
private boolean strictMode = false ;
+
+ public static MultiMap<Byte, Byte> castLookup = new MultiMap<Byte, Byte>();
+ static{
+ //Ordering here decides the score for the best fit function.
+ //Do not change the order. Conversions to a smaller type is preferred
+ //over conversion to a bigger type where ordering of types is:
+ //INTEGER, LONG, FLOAT, DOUBLE, CHARARRAY, TUPLE, BAG, MAP
+ //from small to big
+// castLookup.put(DataType.BOOLEAN, DataType.INTEGER);
+// castLookup.put(DataType.BOOLEAN, DataType.LONG);
+// castLookup.put(DataType.BOOLEAN, DataType.FLOAT);
+// castLookup.put(DataType.BOOLEAN, DataType.DOUBLE);
+// castLookup.put(DataType.BOOLEAN, DataType.CHARARRAY);
+ castLookup.put(DataType.INTEGER, DataType.LONG);
+ castLookup.put(DataType.INTEGER, DataType.FLOAT);
+ castLookup.put(DataType.INTEGER, DataType.DOUBLE);
+// castLookup.put(DataType.INTEGER, DataType.CHARARRAY);
+ castLookup.put(DataType.LONG, DataType.FLOAT);
+ castLookup.put(DataType.LONG, DataType.DOUBLE);
+// castLookup.put(DataType.LONG, DataType.CHARARRAY);
+ castLookup.put(DataType.FLOAT, DataType.DOUBLE);
+// castLookup.put(DataType.FLOAT, DataType.CHARARRAY);
+// castLookup.put(DataType.DOUBLE, DataType.CHARARRAY);
+// castLookup.put(DataType.BYTEARRAY, DataType.BOOLEAN);
+ castLookup.put(DataType.BYTEARRAY, DataType.INTEGER);
+ castLookup.put(DataType.BYTEARRAY, DataType.LONG);
+ castLookup.put(DataType.BYTEARRAY, DataType.FLOAT);
+ castLookup.put(DataType.BYTEARRAY, DataType.DOUBLE);
+ castLookup.put(DataType.BYTEARRAY, DataType.CHARARRAY);
+ castLookup.put(DataType.BYTEARRAY, DataType.TUPLE);
+ castLookup.put(DataType.BYTEARRAY, DataType.BAG);
+ castLookup.put(DataType.BYTEARRAY, DataType.MAP);
+ }
public TypeCheckingVisitor(LogicalPlan plan,
CompilationMessageCollector messageCollector) {
@@ -1190,28 +1225,40 @@
} catch (Exception e) {
throw new VisitorException(e);
}
-
- if(funcSpecs != null) {
- // check the if a FuncSpec matching our schema exists
- FuncSpec matchingSpec = null;
- for (Iterator<FuncSpec> iterator = funcSpecs.iterator(); iterator.hasNext();) {
- FuncSpec fs = iterator.next();
- if(Schema.equals(s, fs.getInputArgsSchema(), false, true)) {
- matchingSpec = fs;
+ FuncSpec matchingSpec = null;
+ if(funcSpecs!=null && funcSpecs.size()!=0){
+ //Some function mappings found. Trying to see
+ //if one of them fits the input schema
+ if((matchingSpec = exactMatch(funcSpecs, s))==null){
+ //Oops, no exact match found. Trying to see if we
+ //have mappings that we can fit using casts.
+ if(byteArrayFound(s) && funcSpecs.size()!=1){
+ //Oops, we have byte arrays and multiple mappings.
+ //Throw exception that we can't infer a fit.
+ String msg = "Could not infer the matching function for " + func.getFuncSpec() + " as multiple of them were found to match " + s.toString() + ". Please use an explicit cast." ;
+ msgCollector.collect(msg, MessageType.Error);
+ throw new VisitorException(msg) ;
+ }
+ else if((matchingSpec=bestFitMatch(funcSpecs,s))==null){
+ //Either no byte arrays found or there are byte arrays
+ //but only one mapping exists.
+ //However, we could not find a match as there were either
+ //none fitting the input schema or it was ambiguous.
+ //Throw exception that we can't infer a fit.
+ String msg = "Could not infer the matching function for " + func.getFuncSpec() + " as multiple or none of them fit. Please use an explicit cast." ;
+ msgCollector.collect(msg, MessageType.Error);
+ throw new VisitorException(msg) ;
}
-
- }
- if(matchingSpec == null) {
- StringBuilder sb = new StringBuilder();
- sb.append(func.getFuncSpec());
- sb.append(" does not work with inputs of type ");
- sb.append(s);
- throw new VisitorException(sb.toString());
- } else {
- func.setFuncSpec(matchingSpec);
}
}
-
+ if(matchingSpec!=null){
+ //Voila! We have a fitting match. Lets insert casts and make
+ //it work.
+ func.setFuncSpec(matchingSpec);
+ insertCastsForUDF(func, s, matchingSpec.getInputArgsSchema());
+ }
+
+ //Regenerate schema as there might be new additions
try {
func.regenerateFieldSchema();
} catch (FrontendException fee) {
@@ -1222,6 +1269,159 @@
throw new VisitorException(msg) ;
}
}
+
+ /**
+ * Tries to find the schema supported by one of funcSpecs which can
+ * be obtained by inserting a set of casts to the input schema
+ * @param funcSpecs - mappings provided by udf
+ * @param s - input schema
+ * @return the funcSpec that supports the schema that is best suited
+ * to s. The best suited schema is one that has the
+ * lowest score as returned by fitPossible().
+ */
+ private FuncSpec bestFitMatch(List<FuncSpec> funcSpecs, Schema s) {
+ FuncSpec matchingSpec = null;
+ long score = INF;
+ long prevBestScore = Long.MAX_VALUE;
+ long bestScore = Long.MAX_VALUE;
+ for (Iterator<FuncSpec> iterator = funcSpecs.iterator(); iterator.hasNext();) {
+ FuncSpec fs = iterator.next();
+ score = fitPossible(s,fs.getInputArgsSchema());
+ if(score!=INF && score<=bestScore){
+ matchingSpec = fs;
+ prevBestScore = bestScore;
+ bestScore = score;
+ }
+ }
+ if(matchingSpec!=null && bestScore!=prevBestScore)
+ return matchingSpec;
+
+ return null;
+ }
+
+ /**
+ * Checks to see if any field of the input schema is a byte array
+ * @param s - input schema
+ * @return true if found else false
+ * @throws VisitorException
+ */
+ private boolean byteArrayFound(Schema s) throws VisitorException {
+ for(int i=0;i<s.size();i++){
+ try {
+ FieldSchema fs=s.getField(i);
+ if(fs.type==DataType.BYTEARRAY){
+ return true;
+ }
+ } catch (ParseException e) {
+ throw new VisitorException(e);
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Finds if there is an exact match between the schema supported by
+ * one of the funcSpecs and the input schema s
+ * @param funcSpecs - mappings provided by udf
+ * @param s - input schema
+ * @return the matching spec if found else null
+ */
+ private FuncSpec exactMatch(List<FuncSpec> funcSpecs, Schema s) {
+ FuncSpec matchingSpec = null;
+ for (Iterator<FuncSpec> iterator = funcSpecs.iterator(); iterator.hasNext();) {
+ FuncSpec fs = iterator.next();
+ if(Schema.equals(s, fs.getInputArgsSchema(), false, true)) {
+ matchingSpec = fs;
+ break;
+ }
+ }
+ return matchingSpec;
+ }
+
+ /**
+ * Computes a modified version of manhattan distance between
+ * the two schemas: s1 & s2. Here the value on the same axis
+ * are preferred over values that change axis as this means
+ * that the number of casts required will be lesser on the same
+ * axis.
+ *
+ * However, this function ceases to be a metric as the triangle
+ * inequality does not hold.
+ *
+ * Each schema is an s1.size() dimensional vector.
+ * The ordering for each axis is as defined by castLookup.
+ * Unallowed casts are returned a dist of INFINITY.
+ * @param s1
+ * @param s2
+ * @return
+ */
+ private long fitPossible(Schema s1, Schema s2) {
+ if(s1==null || s2==null) return INF;
+ List<FieldSchema> sFields = s1.getFields();
+ List<FieldSchema> fsFields = s2.getFields();
+ if(sFields.size()!=fsFields.size())
+ return INF;
+ long score = 0;
+ int castCnt=0;
+ for(int i=0;i<sFields.size();i++){
+ FieldSchema sFS = sFields.get(i);
+ FieldSchema fsFS = fsFields.get(i);
+
+ if(DataType.isSchemaType(sFS.type)){
+ if(!FieldSchema.equals(sFS, fsFS, false, true))
+ return INF;
+ }
+ if(FieldSchema.equals(sFS, fsFS, true, true)) continue;
+ if(!castLookup.containsKey(sFS.type))
+ return INF;
+ if(!(castLookup.get(sFS.type).contains(fsFS.type)))
+ return INF;
+ score += ((List)castLookup.get(sFS.type)).indexOf(fsFS.type) + 1;
+ ++castCnt;
+ }
+ return score * castCnt;
+ }
+
+ private void insertCastsForUDF(LOUserFunc udf, Schema fromSch, Schema toSch) {
+ List<FieldSchema> fsLst = fromSch.getFields();
+ List<FieldSchema> tsLst = toSch.getFields();
+ List<ExpressionOperator> args = udf.getArguments();
+ List<ExpressionOperator> newArgs = new ArrayList<ExpressionOperator>(args.size());
+ int i=-1;
+ for (FieldSchema fFSch : fsLst) {
+ ++i;
+ FieldSchema tFSch = tsLst.get(i);
+ if(fFSch.type==tFSch.type) {
+ newArgs.add(args.get(i));
+ continue;
+ }
+ collectCastWarning(udf,
+ fFSch.type,
+ tFSch.type);
+ LogicalPlan currentPlan = (LogicalPlan) mCurrentWalker.getPlan();
+ /*List<LogicalOperator> list = currentPlan.getPredecessors(udf);
+ if (list == null) {
+ throw new AssertionError("No input for " + udf.getClass());
+ }*/
+ // All uniOps at the moment only work with Expression input
+ ExpressionOperator input = args.get(i);
+ OperatorKey newKey = genNewOperatorKey(udf);
+ LOCast cast = new LOCast(currentPlan, newKey, input, tFSch.type);
+ currentPlan.add(cast);
+ currentPlan.disconnect(input, udf);
+ try {
+ currentPlan.connect(input, cast);
+ currentPlan.connect(cast, udf);
+ } catch (PlanException ioe) {
+ AssertionError err = new AssertionError(
+ "Explicit casting insertion");
+ err.initCause(ioe);
+ throw err;
+ }
+ }
+ udf.setMArgs(newArgs);
+
+ }
/**
* For Bincond, lhsOp and rhsOp must have the same output type
Modified: incubator/pig/branches/types/src/org/apache/pig/pen/EquivalenceClasses.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/EquivalenceClasses.java?rev=700270&r1=700269&r2=700270&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/EquivalenceClasses.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/EquivalenceClasses.java Mon Sep 29 14:46:05 2008
@@ -36,7 +36,6 @@
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.util.IdentityHashSet;
-import com.sun.tools.jdi.EventSetImpl.Itr;
//These methods are used to generate equivalence classes given the operator name and the output from the operator
//For example, it gives out 2 eq. classes for filter, one that passes the filter and one that doesn't
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestBestFitCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestBestFitCast.java?rev=700270&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestBestFitCast.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestBestFitCast.java Mon Sep 29 14:46:05 2008
@@ -0,0 +1,176 @@
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.VisitorException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.sun.org.apache.xerces.internal.impl.xpath.regex.ParseException;
+
+import junit.framework.TestCase;
+
+public class TestBestFitCast extends TestCase {
+ private PigServer pigServer;
+ private MiniCluster cluster = MiniCluster.buildCluster();
+ private File tmpFile;
+
+ public TestBestFitCast() throws ExecException, IOException{
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+// pigServer = new PigServer(ExecType.LOCAL);
+ int LOOP_SIZE = 20;
+ tmpFile = File.createTempFile("test", "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ long l = 0;
+ for(int i = 1; i <= LOOP_SIZE; i++) {
+ ps.println(l + "\t" + i);
+ }
+ ps.close();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ public static class UDF1 extends EvalFunc<Tuple>{
+ /**
+ * java level API
+ * @param input expects a single numeric DataAtom value
+ * @param output returns a single numeric DataAtom value, cosine value of the argument
+ */
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ return input;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.EvalFunc#getArgToFuncMapping()
+ */
+ @Override
+ public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
+ List<FuncSpec> funcList = new ArrayList<FuncSpec>();
+ funcList.add(new FuncSpec(this.getClass().getName(), new Schema(Arrays.asList(new Schema.FieldSchema(null, DataType.FLOAT),new Schema.FieldSchema(null, DataType.FLOAT)))));
+ funcList.add(new FuncSpec(this.getClass().getName(), new Schema(Arrays.asList(new Schema.FieldSchema(null, DataType.LONG),new Schema.FieldSchema(null, DataType.DOUBLE)))));
+ funcList.add(new FuncSpec(this.getClass().getName(), new Schema(new Schema.FieldSchema(null, DataType.FLOAT))));
+ funcList.add(new FuncSpec(this.getClass().getName(), new Schema(new Schema.FieldSchema(null, DataType.INTEGER))));
+ funcList.add(new FuncSpec(this.getClass().getName(), new Schema(new Schema.FieldSchema(null, DataType.DOUBLE))));
+ /*funcList.add(new FuncSpec(DoubleMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.DOUBLE)));
+ funcList.add(new FuncSpec(FloatMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.FLOAT)));
+ funcList.add(new FuncSpec(IntMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
+ funcList.add(new FuncSpec(LongMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
+ funcList.add(new FuncSpec(StringMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY)));*/
+ return funcList;
+ }
+
+ }
+
+ @Test
+ public void test1() throws Exception{
+ //Passing (long, int)
+ //Possible matches: (float, float) , (long, double)
+ //Chooses (long, double) as it has only one cast compared to two for (float, float)
+ pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "' as (x:long, y:int);");
+ pigServer.registerQuery("B = FOREACH A generate x, " + UDF1.class.getName() + "(x,y);");
+ Iterator<Tuple> iter = pigServer.openIterator("B");
+ if(!iter.hasNext()) fail("No Output received");
+ int cnt = 0;
+ while(iter.hasNext()){
+ Tuple t = iter.next();
+ assertEquals(true,((Tuple)t.get(1)).get(0) instanceof Long);
+ assertEquals(true,((Tuple)t.get(1)).get(1) instanceof Double);
+ ++cnt;
+ }
+ assertEquals(20, cnt);
+ }
+
+ @Test
+ public void test2() throws Exception{
+ //Passing (int, int)
+ //Possible matches: (float, float) , (long, double)
+ //Throws Exception as ambiguous definitions found
+ pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "' as (x:long, y:int);");
+ pigServer.registerQuery("B = FOREACH A generate x, " + UDF1.class.getName() + "(y,y);");
+ try{
+ pigServer.openIterator("B");
+ }catch (Exception e) {
+ String msg = e.getMessage();
+ assertEquals(true,msg.contains("as multiple or none of them fit"));
+ }
+
+ }
+
+ @Test
+ public void test3() throws Exception{
+ //Passing (int, int)
+ //Possible matches: (float, float) , (long, double)
+ //Chooses (float, float) as both options lead to same score and (float, float) occurs first.
+ pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "' as (x:long, y:int);");
+ pigServer.registerQuery("B = FOREACH A generate x, " + UDF1.class.getName() + "((float)y,(float)y);");
+ Iterator<Tuple> iter = pigServer.openIterator("B");
+ if(!iter.hasNext()) fail("No Output received");
+ int cnt = 0;
+ while(iter.hasNext()){
+ Tuple t = iter.next();
+ assertEquals(true,((Tuple)t.get(1)).get(0) instanceof Float);
+ assertEquals(true,((Tuple)t.get(1)).get(1) instanceof Float);
+ ++cnt;
+ }
+ assertEquals(20, cnt);
+ }
+
+ @Test
+ public void test4() throws Exception{
+ //Passing (long)
+ //Possible matches: (float), (integer), (double)
+ //Chooses (float) as it leads to a better score that to (double)
+ pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "' as (x:long, y:int);");
+ pigServer.registerQuery("B = FOREACH A generate x, " + UDF1.class.getName() + "(x);");
+ Iterator<Tuple> iter = pigServer.openIterator("B");
+ if(!iter.hasNext()) fail("No Output received");
+ int cnt = 0;
+ while(iter.hasNext()){
+ Tuple t = iter.next();
+ assertEquals(true,((Tuple)t.get(1)).get(0) instanceof Float);
+ ++cnt;
+ }
+ assertEquals(20, cnt);
+ }
+
+ @Test
+ public void test5() throws Exception{
+ //Passing bytearrays
+ //Possible matches: (float, float) , (long, double)
+ //Throws exception since more than one funcSpec and inp is bytearray
+ pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+ pigServer.registerQuery("B = FOREACH A generate $0, " + UDF1.class.getName() + "($1,$1);");
+ try{
+ pigServer.openIterator("B");
+ }catch (Exception e) {
+ String msg = e.getMessage();
+ assertEquals(true,msg.contains("multiple of them were found to match"));
+ }
+
+ }
+}