You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/08/14 23:28:59 UTC

svn commit: r686049 - in /incubator/pig/branches/types: src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/impl/logicalLayer/validator...

Author: olga
Date: Thu Aug 14 14:28:58 2008
New Revision: 686049

URL: http://svn.apache.org/viewvc?rev=686049&view=rev
Log:
PIG-368: support for filter UDFs

Added:
    incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java
Modified:
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=686049&r1=686048&r2=686049&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Aug 14 14:28:58 2008
@@ -949,14 +949,16 @@
         p.setResultType(func.getType());
         currentPlan.add(p);
         List<LogicalOperator> fromList = func.getPlan().getPredecessors(func);
-        for (LogicalOperator op : fromList) {
-            PhysicalOperator from = LogToPhyMap.get(op);
-            try {
-                currentPlan.connect(from, p);
-            } catch (PlanException e) {
-                log.error("Invalid physical operator in the plan"
-                        + e.getMessage());
-                throw new VisitorException(e);
+        if(fromList!=null){
+            for (LogicalOperator op : fromList) {
+                PhysicalOperator from = LogToPhyMap.get(op);
+                try {
+                    currentPlan.connect(from, p);
+                } catch (PlanException e) {
+                    log.error("Invalid physical operator in the plan"
+                            + e.getMessage());
+                    throw new VisitorException(e);
+                }
             }
         }
         LogToPhyMap.put(func, p);

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=686049&r1=686048&r2=686049&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java Thu Aug 14 14:28:58 2008
@@ -55,7 +55,9 @@
     PhysicalPlan plan;
 
     // The root comparison operator of the expression plan
-    ComparisonOperator comOp;
+//    ComparisonOperator comOp;
+    PhysicalOperator comOp;
+    
 
     // The operand type for the comparison operator needed
     // to call the comparison operators getNext with the
@@ -176,8 +178,8 @@
 
     public void setPlan(PhysicalPlan plan) {
         this.plan = plan;
-        comOp = (ComparisonOperator) (plan.getLeaves()).get(0);
-        compOperandType = comOp.getOperandType();
+        comOp = plan.getLeaves().get(0);
+//        compOperandType = comOp.getOperandType();
     }
 
     public PhysicalPlan getPlan() {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=686049&r1=686048&r2=686049&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Thu Aug 14 14:28:58 2008
@@ -1325,22 +1325,24 @@
         // without this
 
         // Assuming all aggregates has only one argument at this stage
-        ExpressionOperator tmpExp = func.getArguments().get(0) ;
-        if ( (ef instanceof Algebraic)
-             && (tmpExp instanceof LOProject)
-             && (((LOProject)tmpExp).getSentinel())) {
-
-            FieldSchema tmpField ;
-
-            try {
-                // embed the schema above inside a bag
-                tmpField = new FieldSchema(null, s, DataType.BAG) ;
-            }
-            catch (FrontendException e) {
-                throw new VisitorException(e) ;
+        if(func.getArguments()!=null && func.getArguments().size()>0){
+            ExpressionOperator tmpExp = func.getArguments().get(0) ;
+            if ( (ef instanceof Algebraic)
+                 && (tmpExp instanceof LOProject)
+                 && (((LOProject)tmpExp).getSentinel())) {
+    
+                FieldSchema tmpField ;
+    
+                try {
+                    // embed the schema above inside a bag
+                    tmpField = new FieldSchema(null, s, DataType.BAG) ;
+                }
+                catch (FrontendException e) {
+                    throw new VisitorException(e) ;
+                }
+    
+                s = new Schema(tmpField) ;
             }
-
-            s = new Schema(tmpField) ;
         }
         
         // ask the EvalFunc what types of inputs it can handle
@@ -2302,6 +2304,9 @@
             else if (op instanceof LOConst) {
                 // don't have to do anything
             }
+            else if (op instanceof LOUserFunc){
+                visit((LOUserFunc)op);
+            }
             else {
                 String msg = "Unsupported root operator in inner plan:"
                              + op.getClass().getSimpleName() ;

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java?rev=686049&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java Thu Aug 14 14:28:58 2008
@@ -0,0 +1,75 @@
+package org.apache.pig.test;
+
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFilterUDF extends TestCase {
+    private PigServer pigServer;
+    
+    @Before
+    public void setUp() throws Exception {
+        pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+    
+    static public class MyFilterFunction extends EvalFunc<Boolean>{
+
+        @Override
+        public Boolean exec(Tuple input) throws IOException {
+            try {
+                int col = (Integer)input.get(0);
+                if(col>10)
+                    return true;
+            } catch (ExecException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+            return false;
+        }
+        
+    }
+    
+    @Test
+    public void testFilterUDF() throws Exception{
+        int LOOP_SIZE = 20;
+        File tmpFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        for(int i = 1; i <= LOOP_SIZE; i++) {
+            ps.println(i);
+        }
+        ps.close();
+        pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "' as (x:int);");
+        pigServer.registerQuery("B = filter A by " + MyFilterFunction.class.getName() + "();");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        if(!iter.hasNext()) fail("No Output received");
+        int cnt = 0;
+        while(iter.hasNext()){
+            Tuple t = iter.next();
+            assertEquals(true,(Integer)t.get(0)>10);
+            ++cnt;
+        }
+        assertEquals(10, cnt);
+    }
+}