You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/12/08 20:54:25 UTC

svn commit: r724462 - in /hadoop/pig/branches/types: ./ src/org/apache/pig/impl/logicalLayer/parser/ test/org/apache/pig/test/ test/org/apache/pig/test/utils/

Author: olga
Date: Mon Dec  8 11:54:25 2008
New Revision: 724462

URL: http://svn.apache.org/viewvc?rev=724462&view=rev
Log:
PIG-546: FilterFunc calls empty constructor when it should be calling parameterized constructor

Added:
    hadoop/pig/branches/types/test/org/apache/pig/test/utils/FILTERFROMFILE.java
Modified:
    hadoop/pig/branches/types/CHANGES.txt
    hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestStreaming.java

Modified: hadoop/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=724462&r1=724461&r2=724462&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Mon Dec  8 11:54:25 2008
@@ -325,3 +325,6 @@
     PIG-538: support for null constants (pradeepk via olgan)
 
     PIG-385: more null handling (pradeepl via olgan)
+
+    PIG-546: FilterFunc calls empty constructor when it should be calling
+    parameterized constructor

Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=724462&r1=724461&r2=724462&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Mon Dec  8 11:54:25 2008
@@ -54,6 +54,9 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.EvalFunc;
+import org.apache.pig.ComparisonFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -615,6 +618,33 @@
         }
 }
 
+class FunctionType {
+    public static final byte UNKNOWNFUNC = 0;
+    public static final byte EVALFUNC = 2;
+    public static final byte COMPARISONFUNC = 4;
+    public static final byte LOADFUNC = 8; 
+    public static final byte STOREFUNC = 16;
+
+    public static void tryCasting(Object func, byte funcType) throws Exception {
+        switch(funcType) {
+        case FunctionType.EVALFUNC:
+			EvalFunc evalFunc = (EvalFunc) func;
+            break;
+        case FunctionType.COMPARISONFUNC:
+			ComparisonFunc comparisonFunc = (ComparisonFunc) func;
+            break;
+        case FunctionType.LOADFUNC:
+			LoadFunc loadFunc = (LoadFunc) func;
+            break;
+        case FunctionType.STOREFUNC:
+			StoreFunc storeFunc = (StoreFunc) func;
+            break;
+        default:
+            throw new Exception("Received an unknown function type: " + funcType);
+        }
+    }
+}
+
 PARSER_END(QueryParser)
 
 // Skip all the new lines, tabs and spaces
@@ -972,12 +1002,7 @@
 {
 	(	filename = FileName()
 		(
-		<USING>  funcName = QualifiedFunction() "(" funcArgs = StringList() ")"
-		{
-			funcSpecAsString = funcName + "(" + funcArgs + ")";
-			funcSpec = new FuncSpec(funcSpecAsString);
-			log.debug("LoadClause: funcSpec = " + funcSpec);
-		}
+        <USING>  funcSpec = NonEvalFuncSpec(FunctionType.LOADFUNC)
 		)?
 		(
 		<SPLIT> <BY> t3 = <QUOTEDSTRING>
@@ -990,7 +1015,7 @@
 		)?
 	)
 	{
-		if (funcSpecAsString == null){
+		if (funcSpec == null){
 			funcSpecAsString = PigStorage.class.getName();
 			funcSpec = new FuncSpec(funcSpecAsString);
 			log.debug("LoadClause: funcSpec = " + funcSpec);
@@ -1272,23 +1297,7 @@
 				log.debug("PUnaryCond: Connected operator " + cond.getClass().getName() + " " + cond + " to " + lhs + " logical plan " + lp);
 			}
 		)
-|	LOOKAHEAD(EvalFunction() "(") 
-		(evalFunc=EvalFunction() "(" args=EvalArgs(over,specs,lp,input) ")" 
-			{
-                FuncSpec funcSpec = new FuncSpec(evalFunc.getClass().getName());
-                Type javaType = evalFunc.getReturnType();
-                byte type = DataType.findType(javaType);
-
-                log.debug("Return type of UDF: " + DataType.findTypeName(type));
-				cond = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcSpec, args, type);
-				lp.add(cond);
-				log.debug("PUnaryCond: Added operator " + cond.getClass().getName() + " " + cond + " to logical plan " + lp);
-				for(ExpressionOperator exprOp: args) {
-					lp.connect(exprOp, cond);
-					log.debug("PUnaryCond: Added operator " + exprOp.getClass().getName() + " " + cond + " to logical plan " + lp);
-				}
-			}
-		)
+|	LOOKAHEAD(EvalFuncSpec(over, specs, lp, input)) cond = EvalFuncSpec(over,specs,lp, input, FunctionType.EVALFUNC)
 |	cond = PNullCond(over,specs,lp,input)
 |	cond = PNotCond(over,specs,lp,input)
 
@@ -1433,6 +1442,7 @@
 	boolean asc = true; 
 	String funcName = null; 
 	Token t1;
+    FuncSpec funcSpec = null;
 	log.trace("Entering OrderClause");
 }
 {
@@ -1461,13 +1471,13 @@
 		}	
 	)
 	(
-        <USING>  funcName = QualifiedFunction()
+	    <USING>  funcSpec = NonEvalFuncSpec(FunctionType.COMPARISONFUNC)
     )?
 
 	)
 	{
 		LOSort sort = new LOSort(lp, new OperatorKey(scope, getNextId()), sortColPlans, ascOrder, 
-		                          (funcName != null ? new FuncSpec(funcName) : null));
+		                          funcSpec );
 		sort.setStar(star);
 		sort.setLimit(-1);
 		lp.add(sort);
@@ -1860,17 +1870,16 @@
     String stream, deserializer;
     StreamingCommand.HandleSpec[] handleSpecs;
     String functionName = "PigStorage", functionArgs="";
+    byte funcType = (handle.compareTo(StreamingCommand.Handle.INPUT) != 0 ? FunctionType.LOADFUNC : FunctionType.STOREFUNC) ;
+    FuncSpec funcSpec = null;
 } 
 {
     stream = CommandStream() 
     [
-        <USING> functionName = QualifiedFunction() 
-        [
-            "(" functionArgs = StringList() ")"
-        ]
+	    <USING>  funcSpec = NonEvalFuncSpec(funcType)
     ]
     {
-        deserializer = functionName + "(" + functionArgs + ")";
+        deserializer = (funcSpec == null? functionName + "(" + ")" : funcSpec.toString());
         command.addHandleSpec(handle, 
                               new HandleSpec(stream, deserializer)
                              );
@@ -1879,13 +1888,10 @@
         "," 
         stream = CommandStream() 
         [
-            <USING> functionName = QualifiedFunction() 
-            [
-                "(" functionArgs = StringList() ")"
-            ]
+	        <USING>  funcSpec = NonEvalFuncSpec(funcType)
         ] 
         {
-            deserializer = functionName + "(" + functionArgs + ")";
+            deserializer = (funcSpec == null? functionName + "(" + ")" : funcSpec.toString());
             command.addHandleSpec(handle, 
                                   new HandleSpec(stream, deserializer)
                                  );
@@ -1921,22 +1927,20 @@
 	}
 }
 LogicalOperator StoreClause(LogicalPlan lp) : {LogicalOperator lo; Token t; String fileName; String functionSpec = null; 
-                                                String functionName, functionArgs;}
+                                                String functionName, functionArgs; FuncSpec funcSpec = null;}
 {
     t = <IDENTIFIER> <INTO> fileName = FileName()
     (
-        <USING> functionName = QualifiedFunction() {functionSpec = functionName;}
-        (
-            "(" functionArgs = StringList() ")" {functionSpec = functionSpec + "(" + functionArgs + ")";}
-        )?
+        <USING>  funcSpec = NonEvalFuncSpec(FunctionType.STOREFUNC)
     )?
     {
 
-        if (functionSpec == null)
-            functionSpec = PigStorage.class.getName() + "()";
+        if (funcSpec == null) {
+            funcSpec = new FuncSpec(PigStorage.class.getName() + "()");
+        }
 
         LogicalOperator store = new LOStore(lp, new OperatorKey(scope, getNextId()),
-                                            new FileSpec(fileName, new FuncSpec(functionSpec)));
+                                            new FileSpec(fileName, funcSpec));
 
         LogicalOperator input = mapAliasOp.get(t.image);
         if (input == null)
@@ -1975,7 +1979,7 @@
 	(
 	t = <IDENTIFIER> "="
 	(
-	LOOKAHEAD(FuncEvalSpec(over, specs, lp, input)) item = InfixExpr(over,specs,lp, input)
+	LOOKAHEAD(EvalFuncSpec(over, specs, lp, input, FunctionType.EVALFUNC)) item = InfixExpr(over,specs,lp, input)
     {
         lp.add(item);
     }
@@ -2126,6 +2130,7 @@
 	LogicalOperator eOp;
 	Token t; 
 	boolean asc = true; 
+    FuncSpec funcSpec = null;
 	log.trace("Entering NestedSortOrArrange");}
 {
 	(
@@ -2157,13 +2162,13 @@
 			}		
 	)     
     (
-        <USING>  funcName = QualifiedFunction()
+        <USING>  funcSpec = NonEvalFuncSpec(FunctionType.COMPARISONFUNC)
     )?
 	)
 	{	
 		log.debug("Before creating LOSort");
 		LOSort sort = new LOSort(lp, new OperatorKey(scope, getNextId()), sortColPlans, ascOrder, 
-		                          (funcName != null ? new FuncSpec(funcName) : null));
+		                          funcSpec);
 		sort.setStar(star);
 		log.debug("After creating LOSort");
 		try {
@@ -2479,8 +2484,8 @@
     LOOKAHEAD(Const(lp)) item = Const(lp)
 |	(
 	(
-		LOOKAHEAD(FuncEvalSpec(over,specs,lp,input))
-		item = FuncEvalSpec(over,specs,lp,input)
+		LOOKAHEAD(EvalFuncSpec(over,specs,lp,input, FunctionType.EVALFUNC))
+		item = EvalFuncSpec(over,specs,lp,input, FunctionType.EVALFUNC)
 	|	item = ColOrSpec(over,specs,lp,input) 
 	| 	item = BinCond(over,specs,lp,input)
 	
@@ -2547,7 +2552,7 @@
 }
 
 
-ExpressionOperator FuncEvalSpec(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
+ExpressionOperator EvalFuncSpec(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input, byte funcType) : 
 {
 	String funcName = null; 
 	FuncSpec funcSpec = null; 
@@ -2556,55 +2561,117 @@
 	List<ExpressionOperator> args;
 	ExpressionOperator userFunc;
     LOUserFunc userAliasFunc = null;
-    EvalFunc evalFunc = null;
-	log.trace("Entering FuncEvalSpec");
+    Object func = null;
+	log.trace("Entering EvalFuncSpec");
 }
 {
 	(
     (
     LOOKAHEAD({ null != pigContext.getFuncSpecFromAlias(getToken(1).image) }) funcNameAlias=QualifiedFunction()
     {
-        
+		func = pigContext.instantiateFuncFromAlias(funcNameAlias);
 		try{
-			evalFunc = (EvalFunc) pigContext.instantiateFuncFromAlias(funcNameAlias);
-            Type javaType = evalFunc.getReturnType();
-            log.debug("Type: " + javaType);
-            log.debug("funcName: " + funcName + " class name: " + evalFunc.getClass().getName() + " return type: " + DataType.findType(javaType));
-		}catch (Exception e){
+            FunctionType.tryCasting(func, funcType);
+		} catch (Exception e){
 			throw new ParseException(e.getMessage());
 		}
     }
     )
-|   evalFunc=EvalFunction()
+|   func=EvalFunction(funcType)
     )
     "(" args=EvalArgs(over,specs,lp,input) ")" 
 	{
-		if(null != evalFunc) {
-            funcName = evalFunc.getClass().getName();
+		if(null != func) {
+            funcName = func.getClass().getName();
             if(null != funcNameAlias) {
                 funcSpec = pigContext.getFuncSpecFromAlias(funcNameAlias);
             } else {
                 funcSpec = new FuncSpec(funcName);
             }
-            Type javaType = evalFunc.getReturnType();
-            byte type = DataType.findType(javaType);
-            log.debug("Return type of UDF: " + DataType.findTypeName(type));
-            log.debug("FuncEvalSpec: funcSpec: " + funcSpec);
+            byte type = DataType.BYTEARRAY;
+            switch(funcType) {
+            case FunctionType.EVALFUNC:
+                Type javaType = ((EvalFunc)func).getReturnType();
+                type = DataType.findType(javaType);
+                log.debug("Return type of UDF: " + DataType.findTypeName(type));
+                log.debug("EvalFuncSpec: funcSpec: " + funcSpec);
+                break;
+            default:
+                throw new ParseException("Received an unknown function type: " + funcType);
+            }
 			userFunc = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcSpec, args, type);
         } else {
             throw new ParseException("Could not instantiate function: " + funcNameAlias);
         }
 		lp.add(userFunc);
-		log.debug("FuncEvalSpec: Added operator " + userFunc.getClass().getName() + " " + userFunc + " to logical plan " + lp);
+		log.debug("EvalFuncSpec: Added operator " + userFunc.getClass().getName() + " " + userFunc + " to logical plan " + lp);
 		for(ExpressionOperator exprOp: args) {
 			lp.connect(exprOp, userFunc);
-			log.debug("FuncEvalSpec: Connected operator " + exprOp.getClass().getName() + " " + exprOp+ " to " + userFunc + " logical plan " + lp);
+			log.debug("EvalFuncSpec: Connected operator " + exprOp.getClass().getName() + " " + exprOp+ " to " + userFunc + " logical plan " + lp);
 		}
-		log.trace("Exiting FuncEvalSpec");
+		log.trace("Exiting EvalFuncSpec");
 		return userFunc;
 	}
 }
 
+FuncSpec NonEvalFuncSpec(byte funcType) : 
+{
+	String functionName = null; 
+	FuncSpec funcSpec = null; 
+	String funcNameAlias = null; 
+    String functionArgs = null;
+    Object func = null;
+	log.trace("Entering NonEvalFuncSpec");
+}
+{
+	(
+    (
+    LOOKAHEAD({ null != pigContext.getFuncSpecFromAlias(getToken(1).image) }) funcNameAlias=QualifiedFunction()
+    {
+		func = pigContext.instantiateFuncFromAlias(funcNameAlias);
+		try{
+            FunctionType.tryCasting(func, funcType);
+		} catch (Exception e){
+			throw new ParseException(e.getMessage());
+		}
+    }
+    )
+|   functionName = QualifiedFunction() ( "(" functionArgs = StringList() ")" )?
+    )
+	{
+		if(null != func) {
+            functionName = func.getClass().getName();
+            if(null != funcNameAlias) {
+                funcSpec = pigContext.getFuncSpecFromAlias(funcNameAlias);
+            } else {
+                funcSpec = new FuncSpec(functionName);
+            }
+        } else if (functionName != null) {
+            funcSpec = new FuncSpec(functionName + (functionArgs == null? "(" + ")" : "(" + functionArgs + ")"));
+        } else {
+            throw new ParseException("Could not instantiate function: " + funcNameAlias);
+        }
+            switch(funcType) {
+            case FunctionType.COMPARISONFUNC:
+            case FunctionType.LOADFUNC:
+            case FunctionType.STOREFUNC:
+                //funcSpec = new FuncSpec(func.getClass().getName() + (functionArgs == null? "(" + ")" : "(" + functionArgs + ")"));
+                System.err.println("funcSpec: " + funcSpec);
+                func = pigContext.instantiateFuncFromSpec(funcSpec);
+		        try{
+                    FunctionType.tryCasting(func, funcType);
+		        } catch (Exception e){
+			        throw new ParseException(e.getMessage());
+		        }
+                break;
+            default:
+                throw new ParseException("Received an unknown function type: " + funcType);
+            }
+		log.trace("Exiting NonEvalFuncSpec");
+		return funcSpec;
+	}
+}
+
 List<ExpressionOperator> EvalArgs(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
 {
 	ArrayList<ExpressionOperator> specList = new ArrayList<ExpressionOperator>(); 
@@ -2943,26 +3010,24 @@
 
 // These the simple non-terminals that are shared across many
 
-EvalFunc  EvalFunction() : 
+Object  EvalFunction(byte funcType) : 
 {
 	String funcName;
-    EvalFunc ef;
+    Object func = null;
 	log.trace("Entering EvalFunction");
 }
 {
 	funcName = QualifiedFunction()
 	{
+        func = pigContext.instantiateFuncFromAlias(funcName);
 		try{
-			ef = (EvalFunc) pigContext.instantiateFuncFromAlias(funcName);
-            Type javaType = ef.getReturnType();
-            log.debug("Type: " + javaType);
-            log.debug("funcName: " + funcName + " class name: " + ef.getClass().getName() + " return type: " + DataType.findType(javaType));
+            FunctionType.tryCasting(func, funcType);
 		}catch (Exception e){
 			throw new ParseException(e.getMessage());
 		}
 		log.trace("Exiting EvalFunction");
 		
-		return ef;
+		return func;
 	}
 }
 

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java?rev=724462&r1=724461&r2=724462&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java Mon Dec  8 11:54:25 2008
@@ -32,9 +32,9 @@
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.test.utils.*;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -44,8 +44,9 @@
     private MiniCluster cluster = MiniCluster.buildCluster();
     private File tmpFile;
     
+    TupleFactory tf = TupleFactory.getInstance();
+
     public TestFilterUDF() throws ExecException, IOException{
-        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         int LOOP_SIZE = 20;
         tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
@@ -57,11 +58,29 @@
     
     @Before
     public void setUp() throws Exception {
-        
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        int LOOP_SIZE = 20;
+        tmpFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        for(int i = 1; i <= LOOP_SIZE; i++) {
+            ps.println(i);
+        }
+        ps.close();
     }
 
     @After
     public void tearDown() throws Exception {
+        tmpFile.delete();
+    }
+    
+    private File createFile(String[] data) throws Exception{
+        File f = File.createTempFile("tmp", "");
+        PrintWriter pw = new PrintWriter(f);
+        for (int i=0; i<data.length; i++){
+            pw.println(data[i]);
+        }
+        pw.close();
+        return f;
     }
     
     static public class MyFilterFunction extends EvalFunc<Boolean>{
@@ -96,4 +115,38 @@
         }
         assertEquals(10, cnt);
     }
+
+    @Test
+    public void testFilterUDFusingDefine() throws Exception{
+        File inputFile= createFile(
+                    new String[]{ 
+                        "www.paulisageek.com\t4",
+                        "www.yahoo.com\t12344",
+                        "google.com\t1",
+                        "us2.amazon.com\t4141"
+                    }
+                );
+
+        File filterFile = createFile(
+                    new String[]{ 
+                        "12344"
+                    }
+                );
+
+        pigServer.registerQuery("define FILTER_CRITERION " + FILTERFROMFILE.class.getName() + "('" + FileLocalizer.hadoopify(Util.generateURI(filterFile.toString()), pigServer.getPigContext()) + "');");
+        pigServer.registerQuery("a = LOAD '" + Util.generateURI(inputFile.toString()) + "' as (url:chararray, numvisits:int);");
+        pigServer.registerQuery("b = filter a by FILTER_CRITERION(numvisits);");
+
+        Tuple expectedTuple = tf.newTuple();
+        expectedTuple.append(new String("www.yahoo.com"));
+        expectedTuple.append(new Integer("12344"));
+
+        Iterator<Tuple> iter = pigServer.openIterator("b");
+        while(iter.hasNext()){
+            Tuple t = iter.next();
+            assertTrue(t.equals(expectedTuple));
+        }
+
+    }
+        
 }

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=724462&r1=724461&r2=724462&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Mon Dec  8 11:54:25 2008
@@ -1839,7 +1839,44 @@
         "CONCAT(null, str);";
         buildPlan(query);
     }
+
+    @Test
+    public void testFilterUdfDefine() {
+        String query = "define isempty IsEmpty(); a = load 'a' as (x:int, y:double, str:chararray);" +
+        		"b = filter a by isempty(*);";
+        buildPlan(query);
+    }
+    
+    @Test
+    public void testLoadUdfDefine() {
+        String query = "define PS PigStorage(); a = load 'a' using PS as (x:int, y:double, str:chararray);" +
+        		"b = filter a by IsEmpty(*);";
+        buildPlan(query);
+    }
     
+    @Test
+    public void testLoadUdfConstructorArgDefine() {
+        String query = "define PS PigStorage(':'); a = load 'a' using PS as (x:int, y:double, str:chararray);" +
+        		"b = filter a by IsEmpty(*);";
+        buildPlan(query);
+    }
+    
+    @Test
+    public void testStoreUdfDefine() {
+        String query = "define PS PigStorage(); a = load 'a' using PS as (x:int, y:double, str:chararray);" +
+        		"b = filter a by IsEmpty(*);" +
+                "store b into 'x' using PS;" ;
+        buildPlan(query);
+    }
+    
+    @Test
+    public void testStoreUdfConstructorArgDefine() {
+        String query = "define PS PigStorage(':'); a = load 'a' using PS as (x:int, y:double, str:chararray);" +
+        		"b = filter a by IsEmpty(*);" +
+                "store b into 'x' using PS;" ;
+        buildPlan(query);
+    }
+
     private void printPlan(LogicalPlan lp) {
         LOPrinter graphPrinter = new LOPrinter(System.err, lp);
         System.err.println("Printing the logical plan");

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestStreaming.java?rev=724462&r1=724461&r2=724462&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestStreaming.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestStreaming.java Mon Dec  8 11:54:25 2008
@@ -302,6 +302,77 @@
     }
 
     @Test
+    public void testInputShipSpecsWithUDFDefine() throws Exception {
+        // FIXME : this should be tested in all modes
+        if(execType == ExecType.LOCAL)
+            return;
+        File input = Util.createInputFile("tmp", "", 
+                                          new String[] {"A,1", "B,2", "C,3", 
+                                                        "D,2", "A,5", "B,5", 
+                                                        "C,8", "A,8", "D,8", 
+                                                        "A,9"});
+
+        // Perl script 
+        String[] script = 
+            new String[] {
+                          "#!/usr/bin/perl",
+                          "open(INFILE,  $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
+                          "while (<INFILE>) {",
+                          "  chomp $_;",
+                          "  print STDOUT \"$_\n\";",
+                          "  print STDERR \"STDERR: $_\n\";",
+                          "}",
+                         };
+        File command1 = Util.createInputFile("script", "pl", script);
+        File command2 = Util.createInputFile("script", "pl", script);
+        
+        // Expected results
+        String[] expectedFirstFields = 
+            new String[] {"A", "B", "C", "A", "D", "A"};
+        Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
+        Tuple[] expectedResults =
+                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+
+        // Pig query to run
+        
+        pigServer.registerQuery(
+                "define PS " + PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery(
+                "define CMD1 `" + command1.getName() + " foo` " +
+                "ship ('" + Util.encodeEscape(command1.toString()) + "') " +
+                "input('foo' using PS ) " +
+                "output(stdout using PS ) " +
+                "stderr();"); 
+        pigServer.registerQuery(
+                "define CMD2 `" + command2.getName() + " bar` " +
+                "ship ('" + Util.encodeEscape(command2.toString()) + "') " +
+                "input('bar' using PS ) " +
+                "output(stdout using PS ) " +        
+                "stderr();"); 
+        pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using PS ;");
+        pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
+        pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
+        		                "through CMD1;");
+        pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
+        
+        String output = "/pig/out";
+        pigServer.deleteFile(output);
+        pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+        
+        InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+        PigStorage ps = new PigStorage(",");
+        ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); 
+        List<Tuple> outputs = new ArrayList<Tuple>();
+        Tuple t;
+        while ((t = ps.getNext()) != null) {
+            outputs.add(t);
+        }
+
+        // Run the query and check the results
+        Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+    }
+
+    @Test
     public void testInputCacheSpecs() throws Exception {
         // Can't run this without HDFS
         if(execType == ExecType.LOCAL)
@@ -437,6 +508,68 @@
     }
 
     @Test
+	public void testOutputShipSpecsWithUDFDefine() throws Exception {
+        // FIXME : this should be tested in all modes
+        if(execType == ExecType.LOCAL)
+            return;
+	    File input = Util.createInputFile("tmp", "", 
+	                                      new String[] {"A,1", "B,2", "C,3", 
+	                                                    "D,2", "A,5", "B,5", 
+	                                                    "C,8", "A,8", "D,8", 
+	                                                    "A,9"});
+
+	    // Perl script 
+	    String[] script = 
+	        new String[] {
+	                      "#!/usr/bin/perl",
+                          "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[1].\"!: $!\";",
+                          "open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[2].\"!: $!\";",
+                          "while (<STDIN>) {",
+                          "  print OUTFILE \"$_\n\";",
+                          "  print STDERR \"STDERR: $_\n\";",
+                          "  print OUTFILE2 \"A,10\n\";",
+                          "}",
+	                     };
+	    File command = Util.createInputFile("script", "pl", script);
+
+        // Expected results
+        String[] expectedFirstFields = 
+            new String[] {"A", "A", "A", "A", "A", "A"};
+        Integer[] expectedSecondFields = new Integer[] {10, 10, 10, 10, 10, 10};
+        Tuple[] expectedResults = 
+                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+
+        // Pig query to run
+        pigServer.registerQuery(
+                "define PS " + PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery(
+                "define CMD `" + command.getName() + " foo bar` " +
+                "ship ('" + Util.encodeEscape(command.toString()) + "') " +
+        		"output('foo' using PS, " +
+        		"'bar' using PS) " +
+        		"stderr();"); 
+        pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using PS;");
+        pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
+        pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");                
+        
+        String output = "/pig/out";
+        pigServer.deleteFile(output);
+        pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+        
+        InputStream op = FileLocalizer.open(output+"/bar", 
+                                            pigServer.getPigContext());
+        PigStorage ps = new PigStorage(",");
+        ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); 
+        List<Tuple> outputs = new ArrayList<Tuple>();
+        Tuple t;
+        while ((t = ps.getNext()) != null) {
+            outputs.add(t);
+        }
+
+        // Run the query and check the results
+        Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+    }
+    @Test
     public void testInputOutputSpecs() throws Exception {
         // FIXME : this should be tested in all modes
         if(execType == ExecType.LOCAL)

Added: hadoop/pig/branches/types/test/org/apache/pig/test/utils/FILTERFROMFILE.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/utils/FILTERFROMFILE.java?rev=724462&view=auto
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/utils/FILTERFROMFILE.java (added)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/utils/FILTERFROMFILE.java Mon Dec  8 11:54:25 2008
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import java.io.File;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.lang.Boolean;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.pig.FilterFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+
+/**
+ * 
+ * define MyFilterSet util.FILTERFROMFILE('/user/pig/filterfile');
+ * 
+ * A = load 'mydata' using PigStorage() as ( a, b );
+ * B = filter A by MyFilterSet(a);
+ * 
+ */
+public class FILTERFROMFILE extends FilterFunc{
+	private String FilterFileName = "";
+	
+	public FILTERFROMFILE(){ 
+    }
+	
+	public FILTERFROMFILE(String FilterFileName){
+		this.FilterFileName = FilterFileName;
+	}
+	
+	Map<String, Boolean> lookupTable = null;
+	
+	
+	private void init() throws IOException {
+	    
+		lookupTable = new HashMap<String, Boolean>();
+		
+		Properties props = ConfigurationUtil.toProperties(PigInputFormat.sJob);
+		InputStream is = FileLocalizer.openDFSFile(FilterFileName, props);
+
+		BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+		
+		while (true){
+			String line = reader.readLine();
+
+			if (line == null)
+				break;
+
+			String FilterField = line.split("\t")[0];
+			
+			lookupTable.put(FilterField, Boolean.TRUE);
+		}
+	}
+	
+	@Override
+	public Boolean exec(Tuple input) throws IOException {
+	 if (lookupTable == null){
+		init();
+	  }	
+	String s;
+        try {
+            s = input.get(0).toString();
+        } catch (ExecException e) {
+            IOException ioe = new IOException("Error getting data");
+            ioe.initCause(e);
+            throw ioe;
+        }
+		
+	   boolean matched = lookupTable.containsKey(s);
+
+	   return(matched);
+    }
+}