You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/08/14 23:28:59 UTC
svn commit: r686049 - in /incubator/pig/branches/types:
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/impl/logicalLayer/validator...
Author: olga
Date: Thu Aug 14 14:28:58 2008
New Revision: 686049
URL: http://svn.apache.org/viewvc?rev=686049&view=rev
Log:
PIG-368: support for filter UDFs
Added:
incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java
Modified:
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=686049&r1=686048&r2=686049&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Aug 14 14:28:58 2008
@@ -949,14 +949,16 @@
p.setResultType(func.getType());
currentPlan.add(p);
List<LogicalOperator> fromList = func.getPlan().getPredecessors(func);
- for (LogicalOperator op : fromList) {
- PhysicalOperator from = LogToPhyMap.get(op);
- try {
- currentPlan.connect(from, p);
- } catch (PlanException e) {
- log.error("Invalid physical operator in the plan"
- + e.getMessage());
- throw new VisitorException(e);
+ if(fromList!=null){
+ for (LogicalOperator op : fromList) {
+ PhysicalOperator from = LogToPhyMap.get(op);
+ try {
+ currentPlan.connect(from, p);
+ } catch (PlanException e) {
+ log.error("Invalid physical operator in the plan"
+ + e.getMessage());
+ throw new VisitorException(e);
+ }
}
}
LogToPhyMap.put(func, p);
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=686049&r1=686048&r2=686049&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java Thu Aug 14 14:28:58 2008
@@ -55,7 +55,9 @@
PhysicalPlan plan;
// The root comparison operator of the expression plan
- ComparisonOperator comOp;
+// ComparisonOperator comOp;
+ PhysicalOperator comOp;
+
// The operand type for the comparison operator needed
// to call the comparison operators getNext with the
@@ -176,8 +178,8 @@
public void setPlan(PhysicalPlan plan) {
this.plan = plan;
- comOp = (ComparisonOperator) (plan.getLeaves()).get(0);
- compOperandType = comOp.getOperandType();
+ comOp = plan.getLeaves().get(0);
+// compOperandType = comOp.getOperandType();
}
public PhysicalPlan getPlan() {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=686049&r1=686048&r2=686049&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Thu Aug 14 14:28:58 2008
@@ -1325,22 +1325,24 @@
// without this
// Assuming all aggregates has only one argument at this stage
- ExpressionOperator tmpExp = func.getArguments().get(0) ;
- if ( (ef instanceof Algebraic)
- && (tmpExp instanceof LOProject)
- && (((LOProject)tmpExp).getSentinel())) {
-
- FieldSchema tmpField ;
-
- try {
- // embed the schema above inside a bag
- tmpField = new FieldSchema(null, s, DataType.BAG) ;
- }
- catch (FrontendException e) {
- throw new VisitorException(e) ;
+ if(func.getArguments()!=null && func.getArguments().size()>0){
+ ExpressionOperator tmpExp = func.getArguments().get(0) ;
+ if ( (ef instanceof Algebraic)
+ && (tmpExp instanceof LOProject)
+ && (((LOProject)tmpExp).getSentinel())) {
+
+ FieldSchema tmpField ;
+
+ try {
+ // embed the schema above inside a bag
+ tmpField = new FieldSchema(null, s, DataType.BAG) ;
+ }
+ catch (FrontendException e) {
+ throw new VisitorException(e) ;
+ }
+
+ s = new Schema(tmpField) ;
}
-
- s = new Schema(tmpField) ;
}
// ask the EvalFunc what types of inputs it can handle
@@ -2302,6 +2304,9 @@
else if (op instanceof LOConst) {
// don't have to do anything
}
+ else if (op instanceof LOUserFunc){
+ visit((LOUserFunc)op);
+ }
else {
String msg = "Unsupported root operator in inner plan:"
+ op.getClass().getSimpleName() ;
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java?rev=686049&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java Thu Aug 14 14:28:58 2008
@@ -0,0 +1,75 @@
+package org.apache.pig.test;
+
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFilterUDF extends TestCase {
+ private PigServer pigServer;
+
+ @Before
+ public void setUp() throws Exception {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ static public class MyFilterFunction extends EvalFunc<Boolean>{
+
+ @Override
+ public Boolean exec(Tuple input) throws IOException {
+ try {
+ int col = (Integer)input.get(0);
+ if(col>10)
+ return true;
+ } catch (ExecException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+ }
+
+ @Test
+ public void testFilterUDF() throws Exception{
+ int LOOP_SIZE = 20;
+ File tmpFile = File.createTempFile("test", "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ for(int i = 1; i <= LOOP_SIZE; i++) {
+ ps.println(i);
+ }
+ ps.close();
+ pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "' as (x:int);");
+ pigServer.registerQuery("B = filter A by " + MyFilterFunction.class.getName() + "();");
+ Iterator<Tuple> iter = pigServer.openIterator("B");
+ if(!iter.hasNext()) fail("No Output received");
+ int cnt = 0;
+ while(iter.hasNext()){
+ Tuple t = iter.next();
+ assertEquals(true,(Integer)t.get(0)>10);
+ ++cnt;
+ }
+ assertEquals(10, cnt);
+ }
+}