You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/14 00:52:03 UTC

svn commit: r656043 [3/3] - in /incubator/pig/branches/types: lib-src/bzip2/org/apache/tools/bzip2r/ lib-src/shock/org/apache/pig/shock/ src/org/apache/pig/builtin/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/validators/ ...

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java Tue May 13 15:52:02 2008
@@ -38,26 +38,26 @@
         super(plan, walker);
     }
 
-	/**
-	 * check if the transform should be done.  If this is being called then
-	 * the pattern matches, but there may be other criteria that must be met
-	 * as well.
-	 * @param nodes - List of nodes declared in transform ($1 = nodes[0],
-	 * etc.)  Remember that somes entries in node[] may be NULL since they may
-	 * not be created until after the transform.
-	 * @returns - true if the transform should be done.
-	 */
-	public abstract boolean check(List<O> nodes);
+    /**
+     * check if the transform should be done.  If this is being called then
+     * the pattern matches, but there may be other criteria that must be met
+     * as well.
+     * @param nodes - List of nodes declared in transform ($1 = nodes[0],
+     * etc.)  Remember that somes entries in node[] may be NULL since they may
+     * not be created until after the transform.
+     * @returns - true if the transform should be done.
+     */
+    public abstract boolean check(List<O> nodes);
 
-	/**
-	 * Transform the tree
-	 * @param nodes - List of nodes declared in transform ($1 = nodes[0],
-	 * etc.)  This call must destruct any nodes that are being removed as part
-	 * of the transform and remove them from the nodes vector and construct
-	 * any that are being created as part of the transform and add them at the
-	 * appropriate point to the nodes vector.
-	 */
-	public abstract void transform(List<O> nodes);
+    /**
+     * Transform the tree
+     * @param nodes - List of nodes declared in transform ($1 = nodes[0],
+     * etc.)  This call must destruct any nodes that are being removed as part
+     * of the transform and remove them from the nodes vector and construct
+     * any that are being created as part of the transform and add them at the
+     * appropriate point to the nodes vector.
+     */
+    public abstract void transform(List<O> nodes);
 
 }
 

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Tue May 13 15:52:02 2008
@@ -694,12 +694,12 @@
         }
     } 
 
-	@Test
+    @Test
     public void testQueryFail17(){
         buildPlan("a = load 'a' as (url, host, rank);");
         buildPlan("b = group a by url; ");
         try {
-        	LogicalPlan lp = buildPlan("c = foreach b generate group.url;");
+            LogicalPlan lp = buildPlan("c = foreach b generate group.url;");
         } catch (AssertionFailedError e) {
             assertTrue(e.getMessage().contains("Exception"));
         }
@@ -713,7 +713,7 @@
         //buildPlan("b = group a by 2*3;");
         //String query = "d = foreach b generate group;";
         buildPlan(query);
-		buildPlan("e = foreach a generate name, details;");
+        buildPlan("e = foreach a generate name, details;");
     }
 
     @Test
@@ -723,15 +723,15 @@
         buildPlan("b = group a by details;");
         String query = "d = foreach b generate group.age;";
         buildPlan(query);
-		buildPlan("e = foreach a generate name, details;");
-		buildPlan("f = LOAD 'myfile' AS (garage: bag{num_tools: integer}, links: bag{websites: chararray}, page: bag{something_stupid: tuple(yeah_double: double)}, coordinates: bag{another_tuple: tuple(ok_float: float, bite_the_array: bytearray), bag_of_unknown: bag{}});");
+        buildPlan("e = foreach a generate name, details;");
+        buildPlan("f = LOAD 'myfile' AS (garage: bag{num_tools: integer}, links: bag{websites: chararray}, page: bag{something_stupid: tuple(yeah_double: double)}, coordinates: bag{another_tuple: tuple(ok_float: float, bite_the_array: bytearray), bag_of_unknown: bag{}});");
     }
 
     @Test
     public void testQueryFail18() {
         String query = "foreach (load 'myfile' as (col1, col2 : (sub1, sub2), col3 : (bag1))) generate col1 ;";
         try {
-        	buildPlan(query);
+            buildPlan(query);
         } catch (AssertionFailedError e) {
             assertTrue(e.getMessage().contains("Exception"));
         }
@@ -744,7 +744,7 @@
         String query = "c = cross a,b;";
         buildPlan(query);
         try {
-        	buildPlan("d = order c by name, b::name, height, a::gpa;");
+            buildPlan("d = order c by name, b::name, height, a::gpa;");
         } catch (AssertionFailedError e) {
             assertTrue(e.getMessage().contains("Exception"));
         }
@@ -783,7 +783,7 @@
             
             //Just the top level roots and their children
             //Need a recursive one to travel down the tree
-			/*
+            /*
             for(LogicalOperator op: lp.getRoots()) {
                 System.err.println("Logical Plan Root: " + op.getClass().getName() + " object " + op);    
 
@@ -797,7 +797,7 @@
                     }
                 }
             }
-			*/
+            */
             assertTrue(lp != null);
             return lp;
         } catch (IOException e) {

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java Tue May 13 15:52:02 2008
@@ -40,68 +40,68 @@
 import junit.framework.TestCase;
 
 public class TestPOBinCond extends TestCase {
-	Random r = new Random();
-	DataBag bag = BagFactory.getInstance().newDefaultBag();
-	final int MAX = 10;
-	
-	@Before
-	@Override
-	public void setUp() {
-		for(int i = 0; i < 10; i ++) {
-			Tuple t = TupleFactory.getInstance().newTuple();
-			t.append(r.nextInt(2));
-			t.append(0);
-			t.append(1);
-			bag.add(t);
-		}
-	}
-	
-	public void testPOBinCond() throws ExecException, PlanException {
-		ConstantExpression rt = (ConstantExpression) GenPhyOp.exprConst();
-		rt.setValue(1);
-		rt.setResultType(DataType.INTEGER);
-		
-		POProject prj1 = GenPhyOp.exprProject();
-		prj1.setColumn(0);
-		prj1.setResultType(DataType.INTEGER);
-		
-		EqualToExpr equal = (EqualToExpr) GenPhyOp.compEqualToExpr();
-		equal.setLhs(prj1);
-		equal.setRhs(rt);
-		
-		POProject prjLhs = GenPhyOp.exprProject();
-		prjLhs.setResultType(DataType.INTEGER);
-		prjLhs.setColumn(1);
-		
-		POProject prjRhs = GenPhyOp.exprProject();
-		prjRhs.setResultType(DataType.INTEGER);
-		prjRhs.setColumn(2);
-		
-		POBinCond op = new POBinCond(new OperatorKey("", r.nextLong()), -1, equal, prjLhs, prjRhs);
-		op.setResultType(DataType.INTEGER);
-		
-		ExprPlan plan = new ExprPlan();
-		plan.add(op);
-		plan.add(prjLhs);
-		plan.add(prjRhs);
-		plan.add(equal);
-		plan.connect(equal, op);
-		plan.connect(prjLhs, op);
-		plan.connect(prjRhs, op);
-		
-		plan.add(prj1);
-		plan.add(rt);
-		plan.connect(prj1, equal);
-		plan.connect(rt, equal);
-		
-		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
-			Tuple t = it.next();
-			plan.attachInput(t);
-			Integer i = (Integer) t.get(0);
-			assertEquals(1, i | (Integer)op.getNext(i).result);
-//			System.out.println(t + " " + op.getNext(i).result.toString());
-		}
-		
-		
-	}
+    Random r = new Random();
+    DataBag bag = BagFactory.getInstance().newDefaultBag();
+    final int MAX = 10;
+    
+    @Before
+    @Override
+    public void setUp() {
+        for(int i = 0; i < 10; i ++) {
+            Tuple t = TupleFactory.getInstance().newTuple();
+            t.append(r.nextInt(2));
+            t.append(0);
+            t.append(1);
+            bag.add(t);
+        }
+    }
+    
+    public void testPOBinCond() throws ExecException, PlanException {
+        ConstantExpression rt = (ConstantExpression) GenPhyOp.exprConst();
+        rt.setValue(1);
+        rt.setResultType(DataType.INTEGER);
+        
+        POProject prj1 = GenPhyOp.exprProject();
+        prj1.setColumn(0);
+        prj1.setResultType(DataType.INTEGER);
+        
+        EqualToExpr equal = (EqualToExpr) GenPhyOp.compEqualToExpr();
+        equal.setLhs(prj1);
+        equal.setRhs(rt);
+        
+        POProject prjLhs = GenPhyOp.exprProject();
+        prjLhs.setResultType(DataType.INTEGER);
+        prjLhs.setColumn(1);
+        
+        POProject prjRhs = GenPhyOp.exprProject();
+        prjRhs.setResultType(DataType.INTEGER);
+        prjRhs.setColumn(2);
+        
+        POBinCond op = new POBinCond(new OperatorKey("", r.nextLong()), -1, equal, prjLhs, prjRhs);
+        op.setResultType(DataType.INTEGER);
+        
+        ExprPlan plan = new ExprPlan();
+        plan.add(op);
+        plan.add(prjLhs);
+        plan.add(prjRhs);
+        plan.add(equal);
+        plan.connect(equal, op);
+        plan.connect(prjLhs, op);
+        plan.connect(prjRhs, op);
+        
+        plan.add(prj1);
+        plan.add(rt);
+        plan.connect(prj1, equal);
+        plan.connect(rt, equal);
+        
+        for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan.attachInput(t);
+            Integer i = (Integer) t.get(0);
+            assertEquals(1, i | (Integer)op.getNext(i).result);
+//            System.out.println(t + " " + op.getNext(i).result.toString());
+        }
+        
+        
+    }
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java Tue May 13 15:52:02 2008
@@ -41,49 +41,49 @@
 import org.junit.Test;
 
 public class TestPODistinct extends TestCase {
-	DataBag input = BagFactory.getInstance().newDefaultBag();
-	Random r = new Random();
-	final int MAX_VALUE = 10;
-	final int MAX_SAMPLES = 100;
+    DataBag input = BagFactory.getInstance().newDefaultBag();
+    Random r = new Random();
+    final int MAX_VALUE = 10;
+    final int MAX_SAMPLES = 100;
 
-	@Before
-	public void setUp() {
-		TupleFactory tf = TupleFactory.getInstance();
-		for (int i = 0; i < MAX_SAMPLES; i++) {
-			Tuple t = tf.newTuple();
-			t.append(r.nextInt(MAX_VALUE));
-			input.add(t);
-			// System.out.println(t);
-		}
-		// System.out.println();
-	}
+    @Before
+    public void setUp() {
+        TupleFactory tf = TupleFactory.getInstance();
+        for (int i = 0; i < MAX_SAMPLES; i++) {
+            Tuple t = tf.newTuple();
+            t.append(r.nextInt(MAX_VALUE));
+            input.add(t);
+            // System.out.println(t);
+        }
+        // System.out.println();
+    }
 
-	@Test
-	public void testPODistict() throws ExecException {
-		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
-		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
-		inputs.add(read);
-		PODistinct distinct = new PODistinct(new OperatorKey("", r.nextLong()),
-				-1, inputs);
-		Map<Tuple, Integer> output = new HashMap<Tuple, Integer>();
-		Tuple t = null;
-		Result res = distinct.getNext(t);
-		t = (Tuple) res.result;
-		while (res.returnStatus != POStatus.STATUS_EOP) {
-			if (output.containsKey(t)) {
-				int i = output.get(t);
-				output.put(t, ++i);
-			} else {
-				output.put(t, 1);
-			}
-			res = distinct.getNext(t);
-			t = (Tuple) res.result;
-		}
-		for (Map.Entry<Tuple, Integer> e : output.entrySet()) {
-			int i = e.getValue();
-			// System.out.println(e.getKey());
-			assertEquals(1, i);
-		}
-	}
+    @Test
+    public void testPODistict() throws ExecException {
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
+        PODistinct distinct = new PODistinct(new OperatorKey("", r.nextLong()),
+                -1, inputs);
+        Map<Tuple, Integer> output = new HashMap<Tuple, Integer>();
+        Tuple t = null;
+        Result res = distinct.getNext(t);
+        t = (Tuple) res.result;
+        while (res.returnStatus != POStatus.STATUS_EOP) {
+            if (output.containsKey(t)) {
+                int i = output.get(t);
+                output.put(t, ++i);
+            } else {
+                output.put(t, 1);
+            }
+            res = distinct.getNext(t);
+            t = (Tuple) res.result;
+        }
+        for (Map.Entry<Tuple, Integer> e : output.entrySet()) {
+            int i = e.getValue();
+            // System.out.println(e.getKey());
+            assertEquals(1, i);
+        }
+    }
 
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java Tue May 13 15:52:02 2008
@@ -36,113 +36,113 @@
 import junit.framework.TestCase;
 
 public class TestPONegative extends TestCase {
-	
-	DataBag bag = BagFactory.getInstance().newDefaultBag();
-	Random r = new Random();
-	TupleFactory tf = TupleFactory.getInstance();
-	final int MAX = 10;
-	
-	public void testPONegInt () throws PlanException, ExecException {
-		for(int i = 0; i < MAX; i++) {
-			Tuple t = tf.newTuple();
-			t.append(r.nextInt());
-			bag.add(t);
-		}
-		
-		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		prj.setResultType(DataType.INTEGER);
-		PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
-		pn.setResultType(DataType.INTEGER);
-		
-		ExprPlan plan = new ExprPlan();
-		plan.add(prj); plan.add(pn);
-		plan.connect(prj, pn);
-		
-		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
-			Tuple t = it.next();
-			plan.attachInput(t);
-			Integer expected = -(Integer)t.get(0);
-			int output = (Integer) pn.getNext(expected).result;
-			assertEquals(expected.intValue(), output);
-		}
-		
-	}
-	
-	public void testPONegLong () throws PlanException, ExecException {
-		for(int i = 0; i < MAX; i++) {
-			Tuple t = tf.newTuple();
-			t.append(r.nextLong());
-			bag.add(t);
-		}
-		
-		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		prj.setResultType(DataType.LONG);
-		PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
-		pn.setResultType(DataType.LONG);
-		
-		ExprPlan plan = new ExprPlan();
-		plan.add(prj); plan.add(pn);
-		plan.connect(prj, pn);
-		
-		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
-			Tuple t = it.next();
-			plan.attachInput(t);
-			Long expected = -(Long)t.get(0);
-			long output = (Long) pn.getNext(expected).result;
-			assertEquals(expected.longValue(), output);
-		}
-		
-	}
-	
-	public void testPONegDouble() throws PlanException, ExecException {
-		for(int i = 0; i < MAX; i++) {
-			Tuple t = tf.newTuple();
-			t.append(r.nextDouble());
-			bag.add(t);
-		}
-		
-		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		prj.setResultType(DataType.DOUBLE);
-		PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
-		pn.setResultType(DataType.DOUBLE);
-		
-		ExprPlan plan = new ExprPlan();
-		plan.add(prj); plan.add(pn);
-		plan.connect(prj, pn);
-		
-		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
-			Tuple t = it.next();
-			plan.attachInput(t);
-			Double expected = -(Double)t.get(0);
-			double output = (Double) pn.getNext(expected).result;
-			assertEquals(expected.doubleValue(), output);
-		}
-		
-	}
-	
-	public void testPONegFloat() throws PlanException, ExecException {
-		for(int i = 0; i < MAX; i++) {
-			Tuple t = tf.newTuple();
-			t.append(r.nextFloat());
-			bag.add(t);
-		}
-		
-		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		prj.setResultType(DataType.FLOAT);
-		PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
-		pn.setResultType(DataType.FLOAT);
-		
-		ExprPlan plan = new ExprPlan();
-		plan.add(prj); plan.add(pn);
-		plan.connect(prj, pn);
-		
-		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
-			Tuple t = it.next();
-			plan.attachInput(t);
-			Float expected = -(Float)t.get(0);
-			float output = (Float) pn.getNext(expected).result;
-			assertEquals(expected.floatValue(), output);
-		}
-		
-	}
+    
+    DataBag bag = BagFactory.getInstance().newDefaultBag();
+    Random r = new Random();
+    TupleFactory tf = TupleFactory.getInstance();
+    final int MAX = 10;
+    
+    public void testPONegInt () throws PlanException, ExecException {
+        for(int i = 0; i < MAX; i++) {
+            Tuple t = tf.newTuple();
+            t.append(r.nextInt());
+            bag.add(t);
+        }
+        
+        POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        prj.setResultType(DataType.INTEGER);
+        PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
+        pn.setResultType(DataType.INTEGER);
+        
+        ExprPlan plan = new ExprPlan();
+        plan.add(prj); plan.add(pn);
+        plan.connect(prj, pn);
+        
+        for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan.attachInput(t);
+            Integer expected = -(Integer)t.get(0);
+            int output = (Integer) pn.getNext(expected).result;
+            assertEquals(expected.intValue(), output);
+        }
+        
+    }
+    
+    public void testPONegLong () throws PlanException, ExecException {
+        for(int i = 0; i < MAX; i++) {
+            Tuple t = tf.newTuple();
+            t.append(r.nextLong());
+            bag.add(t);
+        }
+        
+        POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        prj.setResultType(DataType.LONG);
+        PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
+        pn.setResultType(DataType.LONG);
+        
+        ExprPlan plan = new ExprPlan();
+        plan.add(prj); plan.add(pn);
+        plan.connect(prj, pn);
+        
+        for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan.attachInput(t);
+            Long expected = -(Long)t.get(0);
+            long output = (Long) pn.getNext(expected).result;
+            assertEquals(expected.longValue(), output);
+        }
+        
+    }
+    
+    public void testPONegDouble() throws PlanException, ExecException {
+        for(int i = 0; i < MAX; i++) {
+            Tuple t = tf.newTuple();
+            t.append(r.nextDouble());
+            bag.add(t);
+        }
+        
+        POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        prj.setResultType(DataType.DOUBLE);
+        PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
+        pn.setResultType(DataType.DOUBLE);
+        
+        ExprPlan plan = new ExprPlan();
+        plan.add(prj); plan.add(pn);
+        plan.connect(prj, pn);
+        
+        for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan.attachInput(t);
+            Double expected = -(Double)t.get(0);
+            double output = (Double) pn.getNext(expected).result;
+            assertEquals(expected.doubleValue(), output);
+        }
+        
+    }
+    
+    public void testPONegFloat() throws PlanException, ExecException {
+        for(int i = 0; i < MAX; i++) {
+            Tuple t = tf.newTuple();
+            t.append(r.nextFloat());
+            bag.add(t);
+        }
+        
+        POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        prj.setResultType(DataType.FLOAT);
+        PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
+        pn.setResultType(DataType.FLOAT);
+        
+        ExprPlan plan = new ExprPlan();
+        plan.add(prj); plan.add(pn);
+        plan.connect(prj, pn);
+        
+        for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan.attachInput(t);
+            Float expected = -(Float)t.get(0);
+            float output = (Float) pn.getNext(expected).result;
+            assertEquals(expected.floatValue(), output);
+        }
+        
+    }
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java Tue May 13 15:52:02 2008
@@ -42,183 +42,183 @@
 import org.junit.Test;
 
 public class TestPOSort extends TestCase {
-	Random r = new Random();
-	int MAX_TUPLES = 10;
+    Random r = new Random();
+    int MAX_TUPLES = 10;
 
-	@Test
-	public void testPOSortAscString() throws ExecException {
-		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
-				MAX_TUPLES, 100);
-		List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
-		POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		pr1.setResultType(DataType.CHARARRAY);
-		ExprPlan expPlan = new ExprPlan();
-		expPlan.add(pr1);
-		sortPlans.add(expPlan);
-		List<Boolean> mAscCols = new LinkedList<Boolean>();
-		mAscCols.add(true);
-		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
-		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
-		inputs.add(read);
-		POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
-				sortPlans, mAscCols, null);
-		Tuple t = null;
-		Result res1 = sort.getNext(t);
-		// System.out.println(res1.result);
-		Result res2 = sort.getNext(t);
-		while (res2.returnStatus != POStatus.STATUS_EOP) {
-			Object i1 = ((Tuple) res1.result).get(0);
-			Object i2 = ((Tuple) res2.result).get(0);
-			int i = DataType.compare(i1, i2);
-			// System.out.println(res2.result + " i = " + i);
-			assertEquals(true, (i <= 0));
-			res1 = res2;
-			res2 = sort.getNext(t);
-		}
-	}
-
-	@Test
-	public void testPOSortDescString() throws ExecException {
-		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
-				MAX_TUPLES, 100);
-		List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
-		POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		pr1.setResultType(DataType.CHARARRAY);
-		ExprPlan expPlan = new ExprPlan();
-		expPlan.add(pr1);
-		sortPlans.add(expPlan);
-		List<Boolean> mAscCols = new LinkedList<Boolean>();
-		mAscCols.add(false);
-		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
-		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
-		inputs.add(read);
-		POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
-				sortPlans, mAscCols, null);
-		Tuple t = null;
-		Result res1 = sort.getNext(t);
-		// System.out.println(res1.result);
-		Result res2 = sort.getNext(t);
-		while (res2.returnStatus != POStatus.STATUS_EOP) {
-			Object i1 = ((Tuple) res1.result).get(0);
-			Object i2 = ((Tuple) res2.result).get(0);
-			int i = DataType.compare(i1, i2);
-			// System.out.println(res2.result + " i = " + i);
-			assertEquals(true, (i >= 0));
-			res1 = res2;
-			res2 = sort.getNext(t);
-		}
-	}
-
-	@Test
-	public void testPOSortAsc() throws ExecException {
-		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
-				MAX_TUPLES, 100);
-		List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
-		POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
-		pr1.setResultType(DataType.INTEGER);
-		ExprPlan expPlan = new ExprPlan();
-		expPlan.add(pr1);
-		sortPlans.add(expPlan);
-		List<Boolean> mAscCols = new LinkedList<Boolean>();
-		mAscCols.add(true);
-		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
-		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
-		inputs.add(read);
-		POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
-				sortPlans, mAscCols, null);
-		Tuple t = null;
-		Result res1 = sort.getNext(t);
-		// System.out.println(res1.result);
-		Result res2 = sort.getNext(t);
-		while (res2.returnStatus != POStatus.STATUS_EOP) {
-			Object i1 = ((Tuple) res1.result).get(1);
-			Object i2 = ((Tuple) res2.result).get(1);
-			int i = DataType.compare(i1, i2);
-			assertEquals(true, (i <= 0));
-			// System.out.println(res2.result);
-			res1 = res2;
-			res2 = sort.getNext(t);
-		}
-	}
-
-	@Test
-	public void testPOSortDesc() throws ExecException {
-		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
-				MAX_TUPLES, 100);
-		List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
-		POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
-		pr1.setResultType(DataType.INTEGER);
-		ExprPlan expPlan = new ExprPlan();
-		expPlan.add(pr1);
-		sortPlans.add(expPlan);
-		List<Boolean> mAscCols = new LinkedList<Boolean>();
-		mAscCols.add(false);
-		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
-		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
-		inputs.add(read);
-		POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
-				sortPlans, mAscCols, null);
-		Tuple t = null;
-		Result res1 = sort.getNext(t);
-		// System.out.println(res1.result);
-		Result res2 = sort.getNext(t);
-		while (res2.returnStatus != POStatus.STATUS_EOP) {
-			Object i1 = ((Tuple) res1.result).get(1);
-			Object i2 = ((Tuple) res2.result).get(1);
-			int i = DataType.compare(i1, i2);
-			assertEquals(true, (i >= 0));
-			// System.out.println(res2.result);
-			res1 = res2;
-			res2 = sort.getNext(t);
-		}
-	}
-
-	@Test
-	public void testPOSortUDF() throws ExecException {
-		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
-				MAX_TUPLES, 100);
-		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
-		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
-		inputs.add(read);
-		String funcName = WeirdComparator.class.getName() + "()";
-		/*POUserFunc comparator = new POUserFunc(
-				new OperatorKey("", r.nextLong()), -1, inputs, funcName);*/
-		POUserFunc comparator = new POUserComparisonFunc(
-				new OperatorKey("", r.nextLong()), -1, null, funcName);
-		POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
-				null, null, comparator);
-		Tuple t = null;
-		Result res1 = sort.getNext(t);
-		// System.out.println(res1.result);
-		Result res2 = sort.getNext(t);
-		while (res2.returnStatus != POStatus.STATUS_EOP) {
-			int i1 = (Integer) ((Tuple) res1.result).get(1);
-			int i2 = (Integer) ((Tuple) res2.result).get(1);
-			int i = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
-			assertEquals(true, (i <= 0));
-			System.out.println(i + " : " + res2.result);
-			res1 = res2;
-			res2 = sort.getNext(t);
-		}
-	}
-
-	// sorts values in ascending order of their distance from 50
-	public static class WeirdComparator extends ComparisonFunc {
-
-		@Override
-		public int compare(Tuple t1, Tuple t2) {
-			// TODO Auto-generated method stub
-			int result = 0;
-			try {
-				int i1 = (Integer) t1.get(1);
-				int i2 = (Integer) t2.get(1);
-				result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
-			} catch (ExecException e) {
-				// TODO Auto-generated catch block
-				e.printStackTrace();
-			}
-			return result;
-		}
+    @Test
+    public void testPOSortAscString() throws ExecException {
+        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+                MAX_TUPLES, 100);
+        List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+        POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        pr1.setResultType(DataType.CHARARRAY);
+        ExprPlan expPlan = new ExprPlan();
+        expPlan.add(pr1);
+        sortPlans.add(expPlan);
+        List<Boolean> mAscCols = new LinkedList<Boolean>();
+        mAscCols.add(true);
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
+        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+                sortPlans, mAscCols, null);
+        Tuple t = null;
+        Result res1 = sort.getNext(t);
+        // System.out.println(res1.result);
+        Result res2 = sort.getNext(t);
+        while (res2.returnStatus != POStatus.STATUS_EOP) {
+            Object i1 = ((Tuple) res1.result).get(0);
+            Object i2 = ((Tuple) res2.result).get(0);
+            int i = DataType.compare(i1, i2);
+            // System.out.println(res2.result + " i = " + i);
+            assertEquals(true, (i <= 0));
+            res1 = res2;
+            res2 = sort.getNext(t);
+        }
+    }
+
+    @Test
+    public void testPOSortDescString() throws ExecException {
+        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+                MAX_TUPLES, 100);
+        List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+        POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        pr1.setResultType(DataType.CHARARRAY);
+        ExprPlan expPlan = new ExprPlan();
+        expPlan.add(pr1);
+        sortPlans.add(expPlan);
+        List<Boolean> mAscCols = new LinkedList<Boolean>();
+        mAscCols.add(false);
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
+        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+                sortPlans, mAscCols, null);
+        Tuple t = null;
+        Result res1 = sort.getNext(t);
+        // System.out.println(res1.result);
+        Result res2 = sort.getNext(t);
+        while (res2.returnStatus != POStatus.STATUS_EOP) {
+            Object i1 = ((Tuple) res1.result).get(0);
+            Object i2 = ((Tuple) res2.result).get(0);
+            int i = DataType.compare(i1, i2);
+            // System.out.println(res2.result + " i = " + i);
+            assertEquals(true, (i >= 0));
+            res1 = res2;
+            res2 = sort.getNext(t);
+        }
+    }
+
+    @Test
+    public void testPOSortAsc() throws ExecException {
+        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+                MAX_TUPLES, 100);
+        List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+        POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+        pr1.setResultType(DataType.INTEGER);
+        ExprPlan expPlan = new ExprPlan();
+        expPlan.add(pr1);
+        sortPlans.add(expPlan);
+        List<Boolean> mAscCols = new LinkedList<Boolean>();
+        mAscCols.add(true);
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
+        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+                sortPlans, mAscCols, null);
+        Tuple t = null;
+        Result res1 = sort.getNext(t);
+        // System.out.println(res1.result);
+        Result res2 = sort.getNext(t);
+        while (res2.returnStatus != POStatus.STATUS_EOP) {
+            Object i1 = ((Tuple) res1.result).get(1);
+            Object i2 = ((Tuple) res2.result).get(1);
+            int i = DataType.compare(i1, i2);
+            assertEquals(true, (i <= 0));
+            // System.out.println(res2.result);
+            res1 = res2;
+            res2 = sort.getNext(t);
+        }
+    }
+
+    @Test
+    public void testPOSortDesc() throws ExecException {
+        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+                MAX_TUPLES, 100);
+        List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+        POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+        pr1.setResultType(DataType.INTEGER);
+        ExprPlan expPlan = new ExprPlan();
+        expPlan.add(pr1);
+        sortPlans.add(expPlan);
+        List<Boolean> mAscCols = new LinkedList<Boolean>();
+        mAscCols.add(false);
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
+        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+                sortPlans, mAscCols, null);
+        Tuple t = null;
+        Result res1 = sort.getNext(t);
+        // System.out.println(res1.result);
+        Result res2 = sort.getNext(t);
+        while (res2.returnStatus != POStatus.STATUS_EOP) {
+            Object i1 = ((Tuple) res1.result).get(1);
+            Object i2 = ((Tuple) res2.result).get(1);
+            int i = DataType.compare(i1, i2);
+            assertEquals(true, (i >= 0));
+            // System.out.println(res2.result);
+            res1 = res2;
+            res2 = sort.getNext(t);
+        }
+    }
+
+    @Test
+    public void testPOSortUDF() throws ExecException {
+        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+                MAX_TUPLES, 100);
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
+        String funcName = WeirdComparator.class.getName() + "()";
+        /*POUserFunc comparator = new POUserFunc(
+                new OperatorKey("", r.nextLong()), -1, inputs, funcName);*/
+        POUserFunc comparator = new POUserComparisonFunc(
+                new OperatorKey("", r.nextLong()), -1, null, funcName);
+        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+                null, null, comparator);
+        Tuple t = null;
+        Result res1 = sort.getNext(t);
+        // System.out.println(res1.result);
+        Result res2 = sort.getNext(t);
+        while (res2.returnStatus != POStatus.STATUS_EOP) {
+            int i1 = (Integer) ((Tuple) res1.result).get(1);
+            int i2 = (Integer) ((Tuple) res2.result).get(1);
+            int i = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
+            assertEquals(true, (i <= 0));
+            System.out.println(i + " : " + res2.result);
+            res1 = res2;
+            res2 = sort.getNext(t);
+        }
+    }
+
+    // sorts values in ascending order of their distance from 50
+    public static class WeirdComparator extends ComparisonFunc {
+
+        @Override
+        public int compare(Tuple t1, Tuple t2) {
+            // TODO Auto-generated method stub
+            int result = 0;
+            try {
+                int i1 = (Integer) t1.get(1);
+                int i2 = (Integer) t2.get(1);
+                result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
+            } catch (ExecException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+            return result;
+        }
 
-	}
+    }
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java Tue May 13 15:52:02 2008
@@ -47,286 +47,286 @@
 import org.junit.Test;
 
 public class TestPOUserFunc extends TestCase {
-	Random r = new Random();
-	int MAX_TUPLES = 10;
+    Random r = new Random();
+    int MAX_TUPLES = 10;
 
-	public static class ARITY extends EvalFunc<Integer> {
+    public static class ARITY extends EvalFunc<Integer> {
 
-		@Override
-		public Integer exec(Tuple input) throws IOException {
-			return new Integer(input.size());
-		}
-
-		@Override
-		public Schema outputSchema(Schema input) {
-			// TODO FIX
-			// return new AtomSchema("arity");
-			return null;
-		}
-	}
-
-	public static class WeirdComparator extends ComparisonFunc {
-
-		@Override
-		public int compare(Tuple t1, Tuple t2) {
-			// TODO Auto-generated method stub
-			Object o1 = null;
-			Object o2 = null;
-			try {
-				o1 = t1.get(2);
-				o2 = t2.get(2);
-			} catch (ExecException e) {
-				// TODO Auto-generated catch block
-				e.printStackTrace();
-			}
-			int i1 = (Integer) o1 - 2;
-			int i2 = (Integer) o2 - 2;
-
-			return (int) (i1 * i1 - i2 * i2);
-		}
-
-	}
-
-	/**
-	 * Generates the average of the values of the first field of a tuple. This
-	 * class is Algebraic in implemenation, so if possible the execution will be
-	 * split into a local and global application
-	 */
-	public static class AVG extends EvalFunc<Double> implements Algebraic {
-
-		private static TupleFactory mTupleFactory = TupleFactory.getInstance();
-
-		@Override
-		public Double exec(Tuple input) throws IOException {
-			double sum = 0;
-			double count = 0;
-			
-			try {
-				sum = sum(input);
-				count = count(input);
-			} catch (ExecException e) {
-				e.printStackTrace();
-			}
-
-			double avg = 0;
-			if (count > 0)
-				avg = sum / count;
-
-			return new Double(avg);
-		}
-
-		public String getInitial() {
-			return Initial.class.getName();
-		}
-
-		public String getIntermed() {
-			return Intermed.class.getName();
-		}
-
-		public String getFinal() {
-			return Final.class.getName();
-		}
-
-		static public class Initial extends EvalFunc<Tuple> {
-			@Override
-			public Tuple exec(Tuple input) throws IOException {
-				try {
-					Tuple t = mTupleFactory.newTuple(2);
-					t.set(0, new Double(sum(input)));
-					t.set(1, new Long(count(input)));
-					return t;
-				} catch (ExecException t) {
-					throw new RuntimeException(t.getMessage() + ": " + input);
-				}
-			}
-		}
-
-		static public class Intermed extends EvalFunc<Tuple> {
-			@Override
-			public Tuple exec(Tuple input) throws IOException {
-				DataBag b = null;
-				Tuple t = null;
-				try {
-					b = (DataBag) input.get(0);
-					t = combine(b);
-				} catch (ExecException e) {
-					// TODO Auto-generated catch block
-					e.printStackTrace();
-				}
-				return t;
-			}
-		}
-
-		static public class Final extends EvalFunc<Double> {
-			@Override
-			public Double exec(Tuple input) throws IOException {
-				double sum = 0;
-				double count = 0;
-				try {
-					DataBag b = (DataBag) input.get(0);
-					Tuple combined = combine(b);
-
-					sum = (Double) combined.get(0);
-					count = (Long) combined.get(1);
-				} catch (ExecException e) {
-					e.printStackTrace();
-				}
-
-				double avg = 0;
-				if (count > 0) {
-					avg = sum / count;
-				}
-				return new Double(avg);
-			}
-		}
-
-		static protected Tuple combine(DataBag values) throws ExecException {
-			double sum = 0;
-			long count = 0;
-
-			Tuple output = mTupleFactory.newTuple(2);
-
-			for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-				Tuple t = it.next();
-				sum += (Double) t.get(0);
-				count += (Long) t.get(1);
-			}
-
-			output.set(0, new Double(sum));
-			output.set(1, new Long(count));
-			return output;
-		}
-
-		static protected long count(Tuple input) throws ExecException {
-			DataBag values = (DataBag) input.get(0);
-			return values.size();
-		}
-
-		static protected double sum(Tuple input) throws ExecException {
-			DataBag values = (DataBag) input.get(0);
-
-			double sum = 0;
-			for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-				Tuple t = it.next();
-				Double d = DataType.toDouble(t.get(0));
-				if (d == null)
-					continue;
-				sum += d;
-			}
-
-			return sum;
-		}
-
-		@Override
-		public Schema outputSchema(Schema input) {
-			// TODO FIX
-			// return new AtomSchema("average");
-			return null;
-		}
-
-	}
-
-	@Test
-	public void testUserFuncArity() throws ExecException {
-		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
-				MAX_TUPLES, 100);
-		String funcSpec = ARITY.class.getName() + "()";
-		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
-		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
-		inputs.add(read);
-		POUserFunc userFunc = new POUserFunc(new OperatorKey("", r.nextLong()),
-				-1, inputs, funcSpec);
-		Result res = new Result();
-		Integer i = null;
-		res = userFunc.getNext(i);
-		while (res.returnStatus != POStatus.STATUS_EOP) {
-			// System.out.println(res.result);
-			int result = (Integer) res.result;
-			assertEquals(2, result);
-			res = userFunc.getNext(i);
-		}
-	}
-
-	@Test
-	public void testUDFCompare() throws ExecException {
-		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, 2,
-				100);
-		String funcSpec = WeirdComparator.class.getName() + "()";
-		POUserFunc userFunc = new POUserComparisonFunc(new OperatorKey("", r.nextLong()),
-				-1, null, funcSpec);
-		Iterator<Tuple> it = input.iterator();
-		Tuple t1 = it.next();
-		Tuple t2 = it.next();
-		t1.append(2);
-		t2.append(3);
-		((POUserComparisonFunc)userFunc).attachInput(t1, t2);
-		Integer i = null;
-		// System.out.println(t1 + " " + t2);
-		int result = (Integer) (userFunc.getNext(i).result);
-		assertEquals(-1, result);
-	}
-
-	@Test
-	public void testAlgebraicAVG() throws IOException, ExecException {
-		int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
-		byte INIT = 0;
-		byte INTERMED = 1;
-		byte FINAL = 2;
-		Tuple tup1 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1),
-				input);
-		Tuple tup2 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1),
-				input);
-		// System.out.println("Input = " + tup1);
-		String funcSpec = AVG.class.getName() + "()";
-
-		POUserFunc po = new POUserFunc(new OperatorKey("", r.nextLong()), -1,
-				null, funcSpec);
-
-		TupleFactory tf = TupleFactory.getInstance();
-
-		po.setAlgebraicFunction(INIT);
-		po.attachInput(tup1);
-		Tuple t = null;
-		Result res = po.getNext(t);
-		Tuple outputInitial1 = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
-				: null;
-		Tuple outputInitial2 = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
-				: null;
-		System.out.println(outputInitial1 + " " + outputInitial2);
-		assertEquals(outputInitial1, outputInitial2);
-		double sum = (Double) outputInitial1.get(0);
-		long count = (Long) outputInitial1.get(1);
-		assertEquals(55.0, sum);
-		assertEquals(10, count);
-		DataBag bag = BagFactory.getInstance().newDefaultBag();
-		bag.add(outputInitial1);
-		bag.add(outputInitial2);
-		Tuple outputInitial = tf.newTuple();
-		outputInitial.append(bag);
-		// Tuple outputIntermed = intermed.exec(outputInitial);
-		po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
-				funcSpec);
-		po.setAlgebraicFunction(INTERMED);
-		po.attachInput(outputInitial);
-		res = po.getNext(t);
-		Tuple outputIntermed = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
-				: null;
-
-		sum = (Double) outputIntermed.get(0);
-		count = (Long) outputIntermed.get(1);
-		assertEquals(110.0, sum);
-		assertEquals(20, count);
-		System.out.println(outputIntermed);
-		po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
-				funcSpec);
-		po.setAlgebraicFunction(FINAL);
-		po.attachInput(outputInitial);
-		res = po.getNext(t);
-		Double output = (res.returnStatus == POStatus.STATUS_OK) ? (Double) res.result
-				: null;
-		// Double output = fin.exec(outputInitial);
-		assertEquals(5.5, output);
-		// System.out.println("output = " + output);
+        @Override
+        public Integer exec(Tuple input) throws IOException {
+            return new Integer(input.size());
+        }
+
+        @Override
+        public Schema outputSchema(Schema input) {
+            // TODO FIX
+            // return new AtomSchema("arity");
+            return null;
+        }
+    }
+
+    public static class WeirdComparator extends ComparisonFunc {
+
+        @Override
+        public int compare(Tuple t1, Tuple t2) {
+            // TODO Auto-generated method stub
+            Object o1 = null;
+            Object o2 = null;
+            try {
+                o1 = t1.get(2);
+                o2 = t2.get(2);
+            } catch (ExecException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+            int i1 = (Integer) o1 - 2;
+            int i2 = (Integer) o2 - 2;
+
+            return (int) (i1 * i1 - i2 * i2);
+        }
+
+    }
+
+    /**
+     * Generates the average of the values of the first field of a tuple. This
+     * class is Algebraic in implemenation, so if possible the execution will be
+     * split into a local and global application
+     */
+    public static class AVG extends EvalFunc<Double> implements Algebraic {
+
+        private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+        @Override
+        public Double exec(Tuple input) throws IOException {
+            double sum = 0;
+            double count = 0;
+            
+            try {
+                sum = sum(input);
+                count = count(input);
+            } catch (ExecException e) {
+                e.printStackTrace();
+            }
+
+            double avg = 0;
+            if (count > 0)
+                avg = sum / count;
+
+            return new Double(avg);
+        }
+
+        public String getInitial() {
+            return Initial.class.getName();
+        }
+
+        public String getIntermed() {
+            return Intermed.class.getName();
+        }
+
+        public String getFinal() {
+            return Final.class.getName();
+        }
+
+        static public class Initial extends EvalFunc<Tuple> {
+            @Override
+            public Tuple exec(Tuple input) throws IOException {
+                try {
+                    Tuple t = mTupleFactory.newTuple(2);
+                    t.set(0, new Double(sum(input)));
+                    t.set(1, new Long(count(input)));
+                    return t;
+                } catch (ExecException t) {
+                    throw new RuntimeException(t.getMessage() + ": " + input);
+                }
+            }
+        }
+
+        static public class Intermed extends EvalFunc<Tuple> {
+            @Override
+            public Tuple exec(Tuple input) throws IOException {
+                DataBag b = null;
+                Tuple t = null;
+                try {
+                    b = (DataBag) input.get(0);
+                    t = combine(b);
+                } catch (ExecException e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+                return t;
+            }
+        }
+
+        static public class Final extends EvalFunc<Double> {
+            @Override
+            public Double exec(Tuple input) throws IOException {
+                double sum = 0;
+                double count = 0;
+                try {
+                    DataBag b = (DataBag) input.get(0);
+                    Tuple combined = combine(b);
+
+                    sum = (Double) combined.get(0);
+                    count = (Long) combined.get(1);
+                } catch (ExecException e) {
+                    e.printStackTrace();
+                }
+
+                double avg = 0;
+                if (count > 0) {
+                    avg = sum / count;
+                }
+                return new Double(avg);
+            }
+        }
+
+        static protected Tuple combine(DataBag values) throws ExecException {
+            double sum = 0;
+            long count = 0;
+
+            Tuple output = mTupleFactory.newTuple(2);
+
+            for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+                Tuple t = it.next();
+                sum += (Double) t.get(0);
+                count += (Long) t.get(1);
+            }
+
+            output.set(0, new Double(sum));
+            output.set(1, new Long(count));
+            return output;
+        }
+
+        static protected long count(Tuple input) throws ExecException {
+            DataBag values = (DataBag) input.get(0);
+            return values.size();
+        }
+
+        static protected double sum(Tuple input) throws ExecException {
+            DataBag values = (DataBag) input.get(0);
+
+            double sum = 0;
+            for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+                Tuple t = it.next();
+                Double d = DataType.toDouble(t.get(0));
+                if (d == null)
+                    continue;
+                sum += d;
+            }
+
+            return sum;
+        }
+
+        @Override
+        public Schema outputSchema(Schema input) {
+            // TODO FIX
+            // return new AtomSchema("average");
+            return null;
+        }
+
+    }
+
+    @Test
+    public void testUserFuncArity() throws ExecException {
+        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+                MAX_TUPLES, 100);
+        String funcSpec = ARITY.class.getName() + "()";
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
+        POUserFunc userFunc = new POUserFunc(new OperatorKey("", r.nextLong()),
+                -1, inputs, funcSpec);
+        Result res = new Result();
+        Integer i = null;
+        res = userFunc.getNext(i);
+        while (res.returnStatus != POStatus.STATUS_EOP) {
+            // System.out.println(res.result);
+            int result = (Integer) res.result;
+            assertEquals(2, result);
+            res = userFunc.getNext(i);
+        }
+    }
+
+    @Test
+    public void testUDFCompare() throws ExecException {
+        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, 2,
+                100);
+        String funcSpec = WeirdComparator.class.getName() + "()";
+        POUserFunc userFunc = new POUserComparisonFunc(new OperatorKey("", r.nextLong()),
+                -1, null, funcSpec);
+        Iterator<Tuple> it = input.iterator();
+        Tuple t1 = it.next();
+        Tuple t2 = it.next();
+        t1.append(2);
+        t2.append(3);
+        ((POUserComparisonFunc)userFunc).attachInput(t1, t2);
+        Integer i = null;
+        // System.out.println(t1 + " " + t2);
+        int result = (Integer) (userFunc.getNext(i).result);
+        assertEquals(-1, result);
+    }
+
+    @Test
+    public void testAlgebraicAVG() throws IOException, ExecException {
+        int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+        byte INIT = 0;
+        byte INTERMED = 1;
+        byte FINAL = 2;
+        Tuple tup1 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1),
+                input);
+        Tuple tup2 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1),
+                input);
+        // System.out.println("Input = " + tup1);
+        String funcSpec = AVG.class.getName() + "()";
+
+        POUserFunc po = new POUserFunc(new OperatorKey("", r.nextLong()), -1,
+                null, funcSpec);
+
+        TupleFactory tf = TupleFactory.getInstance();
+
+        po.setAlgebraicFunction(INIT);
+        po.attachInput(tup1);
+        Tuple t = null;
+        Result res = po.getNext(t);
+        Tuple outputInitial1 = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
+                : null;
+        Tuple outputInitial2 = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
+                : null;
+        System.out.println(outputInitial1 + " " + outputInitial2);
+        assertEquals(outputInitial1, outputInitial2);
+        double sum = (Double) outputInitial1.get(0);
+        long count = (Long) outputInitial1.get(1);
+        assertEquals(55.0, sum);
+        assertEquals(10, count);
+        DataBag bag = BagFactory.getInstance().newDefaultBag();
+        bag.add(outputInitial1);
+        bag.add(outputInitial2);
+        Tuple outputInitial = tf.newTuple();
+        outputInitial.append(bag);
+        // Tuple outputIntermed = intermed.exec(outputInitial);
+        po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
+                funcSpec);
+        po.setAlgebraicFunction(INTERMED);
+        po.attachInput(outputInitial);
+        res = po.getNext(t);
+        Tuple outputIntermed = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
+                : null;
+
+        sum = (Double) outputIntermed.get(0);
+        count = (Long) outputIntermed.get(1);
+        assertEquals(110.0, sum);
+        assertEquals(20, count);
+        System.out.println(outputIntermed);
+        po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
+                funcSpec);
+        po.setAlgebraicFunction(FINAL);
+        po.attachInput(outputInitial);
+        res = po.getNext(t);
+        Double output = (res.returnStatus == POStatus.STATUS_OK) ? (Double) res.result
+                : null;
+        // Double output = fin.exec(outputInitial);
+        assertEquals(5.5, output);
+        // System.out.println("output = " + output);
 
-	}
+    }
 }