You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/08 00:17:08 UTC

svn commit: r654303 - in /incubator/pig/branches/types: ./ src/org/apache/pig/ src/org/apache/pig/impl/physicalLayer/plans/ src/org/apache/pig/impl/physicalLayer/topLevelOperators/ test/org/apache/pig/test/

Author: gates
Date: Wed May  7 15:17:03 2008
New Revision: 654303

URL: http://svn.apache.org/viewvc?rev=654303&view=rev
Log:
Shubham's changes to add POUserFunc, PODistinct, POSort.


Added:
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PODistinct.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PORead.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserComparisonFunc.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserFunc.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java
Modified:
    incubator/pig/branches/types/build.xml
    incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestEqualTo.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestNotEqualTo.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java

Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Wed May  7 15:17:03 2008
@@ -142,9 +142,10 @@
         	    **/test/TestEqualTo.java,**/test/TestNotEqualTo.java, **/test/TestPOGenerate.java,
                 **/test/TestProject.java, **/test/TestLoad.java, **/test/TestStore.java,
                  **/test/FakeFSOutputStream.java, **/test/TestPackage.java, **/test/TestForEach.java,
-        		**/test/TestLocalRearrange.java,
+        		**/test/TestLocalRearrange.java, **/test/TestPOUserFunc.java,
+        		**/test/TestPODistinct.java, **/test/TestPOSort.java,
                 **/test/FakeFSInputStream.java, **/test/Util.java,
-                **/logicalLayer/*.java,
+                **/logicalLayer/*.java, **/logicalLayer/parser/NodeIdGenerator.java,
                 **/logicalLayer/schema/*.java, **/physicalLayer/topLevelOperators/*.java,
                 **/physicalLayer/topLevelOperators/**/*.java, **/physicalLayer/plans/*.java,
                 **/physicalLayer/Result.java,
@@ -252,6 +253,9 @@
                 	<include name="**/TestEqualTo.java" />
                 	<include name="**/TestNotEqualTo.java" />
                 	<include name="**/TestPOGenerate.java" />
+                	<include name="**/TestPOSort.java" />
+                	<include name="**/TestPOUserFunc.java" />
+                	<include name="**/TestPODistinct.java" />
                 	<include name="**/TestLoad.java" />
                 	<include name="**/TestStore.java" />
                 	<include name="**/TestPackage.java" />  

Modified: incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java Wed May  7 15:17:03 2008
@@ -22,11 +22,12 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 
 
 public abstract class ComparisonFunc extends WritableComparator {
     public ComparisonFunc() {
-        super(Tuple.class);
+        super(TupleFactory.getInstance().tupleClass());
     }
 
     public int compare(WritableComparable a, WritableComparable b) {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java Wed May  7 15:17:03 2008
@@ -21,11 +21,15 @@
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PODistinct;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PORead;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSort;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUserFunc;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
@@ -101,5 +105,21 @@
         //do nothing
     }
 
+	public void visitDistinct(PODistinct distinct) {
+		
+	}
+
+	public void visitRead(PORead read) {
+		
+	}
+
+	public void visitSort(POSort sort) {
+		
+	}
+
+	public void visitUserFunc(POUserFunc userFunc) {
+		
+	}
+
 }
 

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PODistinct.java?rev=654303&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PODistinct.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PODistinct.java Wed May  7 15:17:03 2008
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This is a blocking operator. All the input is put in the hashset implemented
+ * in DistinctDataBag which also provides the other DataBag interfaces.
+ * 
+ * 
+ */
+public class PODistinct extends PhysicalOperator<PhyPlanVisitor> {
+
+	private boolean inputsAccumulated = false;
+	private DataBag distinctBag = BagFactory.getInstance().newDistinctBag();
+	private final Log log = LogFactory.getLog(getClass());
+	transient Iterator<Tuple> it;
+
+	public PODistinct(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+		super(k, rp, inp);
+		// TODO Auto-generated constructor stub
+	}
+
+	public PODistinct(OperatorKey k, int rp) {
+		super(k, rp);
+		// TODO Auto-generated constructor stub
+	}
+
+	public PODistinct(OperatorKey k, List<PhysicalOperator> inp) {
+		super(k, inp);
+		// TODO Auto-generated constructor stub
+	}
+
+	public PODistinct(OperatorKey k) {
+		super(k);
+		// TODO Auto-generated constructor stub
+	}
+
+	@Override
+	public boolean isBlocking() {
+		// TODO Auto-generated method stub
+		return true;
+	}
+
+	@Override
+	public Result getNext(Tuple t) throws ExecException {
+		if (!inputsAccumulated) {
+			Result in = processInput();
+			while (in.returnStatus != POStatus.STATUS_EOP) {
+				if (in.returnStatus == POStatus.STATUS_ERR) {
+					log.error("Error in reading from inputs");
+					continue;
+				} else if (in.returnStatus == POStatus.STATUS_NULL) {
+					continue;
+				}
+				distinctBag.add((Tuple) in.result);
+				in = processInput();
+			}
+			inputsAccumulated = true;
+		}
+		if (it == null) {
+			it = distinctBag.iterator();
+		}
+		res.result = it.next();
+		if (res.result == null)
+			res.returnStatus = POStatus.STATUS_EOP;
+		else
+			res.returnStatus = POStatus.STATUS_OK;
+		return res;
+
+	}
+
+	@Override
+	public String name() {
+		// TODO Auto-generated method stub
+		return "PODistinct - " + mKey.toString();
+	}
+
+	@Override
+	public boolean supportsMultipleInputs() {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	@Override
+	public boolean supportsMultipleOutputs() {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	@Override
+	public void visit(PhyPlanVisitor v) throws VisitorException {
+		// TODO Auto-generated method stub
+		v.visitDistinct(this);
+	}
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PORead.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PORead.java?rev=654303&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PORead.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PORead.java Wed May  7 15:17:03 2008
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This operator is used to read tuples from a databag in memory. Used mostly
+ * for testing. It'd also be useful for the example generator
+ * 
+ */
+public class PORead extends PhysicalOperator<PhyPlanVisitor> {
+
+	DataBag bag;
+	transient Iterator<Tuple> it;
+
+	public PORead(OperatorKey k) {
+		super(k);
+		// TODO Auto-generated constructor stub
+	}
+
+	public PORead(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+		super(k, rp, inp);
+		// TODO Auto-generated constructor stub
+	}
+
+	public PORead(OperatorKey k, int rp) {
+		super(k, rp);
+		// TODO Auto-generated constructor stub
+	}
+
+	public PORead(OperatorKey k, List<PhysicalOperator> inp) {
+		super(k, inp);
+		// TODO Auto-generated constructor stub
+	}
+
+	public PORead(OperatorKey k, DataBag bag) {
+		super(k);
+		this.bag = bag;
+	}
+
+	@Override
+	public Result getNext(Tuple t) {
+		if (it == null) {
+			it = bag.iterator();
+		}
+		Result res = new Result();
+		if (it.hasNext()) {
+			res.returnStatus = POStatus.STATUS_OK;
+			res.result = it.next();
+		} else {
+			res.returnStatus = POStatus.STATUS_EOP;
+		}
+		return res;
+	}
+
+	@Override
+	public String name() {
+		// TODO Auto-generated method stub
+		return "PORead - " + mKey.toString();
+	}
+
+	@Override
+	public boolean supportsMultipleInputs() {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	@Override
+	public boolean supportsMultipleOutputs() {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	@Override
+	public void visit(PhyPlanVisitor v) throws VisitorException {
+		// TODO Auto-generated method stub
+		v.visitRead(this);
+	}
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java?rev=654303&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java Wed May  7 15:17:03 2008
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This implementation is applicable for both the physical plan and for the
+ * local backend, as the conversion of physical to mapreduce would see the SORT
+ * operator and take necessary steps to convert it to a quantile and a sort job.
+ * 
+ * This is a blocking operator. The sortedDataBag accumulates Tuples and sorts
+ * them only when there an iterator is started. So all the tuples from the input
+ * operator should be accumulated and filled into the dataBag. The attachInput
+ * method is not applicable here.
+ * 
+ * 
+ */
+public class POSort extends PhysicalOperator<PhyPlanVisitor> {
+
+	//private List<Integer> mSortCols;
+	private List<ExprPlan> sortPlans;
+	private List<Byte> ExprOutputTypes;
+	private List<Boolean> mAscCols;
+	private POUserComparisonFunc mSortFunc;
+	private final Log log = LogFactory.getLog(getClass());
+
+	private boolean inputsAccumulated = false;
+	public boolean isUDFComparatorUsed = false;
+	private DataBag sortedBag;
+	transient Iterator<Tuple> it;
+
+	public POSort(OperatorKey k, int rp, List inp, List<ExprPlan> sortPlans,
+			List<Boolean> mAscCols, POUserFunc mSortFunc) {
+		super(k, rp, inp);
+		//this.mSortCols = mSortCols;
+		this.sortPlans = sortPlans;
+		this.mAscCols = mAscCols;
+		this.mSortFunc = (POUserComparisonFunc) mSortFunc;
+		if (mSortFunc == null) {
+			sortedBag = BagFactory.getInstance().newSortedBag(
+					new SortComparator());
+			ExprOutputTypes = new ArrayList<Byte>(sortPlans.size());
+
+			for(ExprPlan plan : sortPlans) {
+				ExprOutputTypes.add(plan.getLeaves().get(0).resultType);
+			}
+		} else {
+			sortedBag = BagFactory.getInstance().newSortedBag(
+					new UDFSortComparator());
+			isUDFComparatorUsed = true;
+		}
+	}
+
+	public POSort(OperatorKey k, int rp, List inp) {
+		super(k, rp, inp);
+
+	}
+
+	public POSort(OperatorKey k, int rp) {
+		super(k, rp);
+
+	}
+
+	public POSort(OperatorKey k, List inp) {
+		super(k, inp);
+
+	}
+
+	public POSort(OperatorKey k) {
+		super(k);
+
+	}
+	
+	public class SortComparator implements Comparator<Tuple> {
+		public int compare(Tuple o1, Tuple o2) {
+			int count = 0;
+			int ret = 0;
+			if(sortPlans == null || sortPlans.size() == 0) 
+				return 0;
+			for(ExprPlan plan : sortPlans) {
+				try {
+					plan.attachInput(o1);
+					Result res1 = getResult(plan, ExprOutputTypes.get(count));
+					plan.attachInput(o2);
+					Result res2 = getResult(plan, ExprOutputTypes.get(count));
+					if(res1.returnStatus != POStatus.STATUS_OK || res2.returnStatus != POStatus.STATUS_OK) {
+						log.error("Error processing the input in the expression plan : " + plan.toString());
+					} else {
+						if(mAscCols.get(count ++))
+							ret = DataType.compare(res1.result, res2.result);
+						else
+							ret = DataType.compare(res2.result, res1.result);
+					}
+						
+				} catch (ExecException e) {
+					log.error("Invalid result while executing the expression plan : " + plan.toString() + "\n" + e.getMessage());
+				}
+				
+			}
+			return ret;
+		} 
+		
+		private Result getResult(ExprPlan plan, byte resultType) throws ExecException {
+			ExpressionOperator Op = plan.getLeaves().get(0);
+			Result res = null;
+			
+			switch (resultType) {
+            case DataType.BYTEARRAY:
+                res = Op.getNext(dummyDBA);
+                break;
+            case DataType.CHARARRAY:
+                res = Op.getNext(dummyString);
+                break;
+            case DataType.DOUBLE:
+                res = Op.getNext(dummyDouble);
+                break;
+            case DataType.FLOAT:
+                res = Op.getNext(dummyFloat);
+                break;
+            case DataType.INTEGER:
+                res = Op.getNext(dummyInt);
+                break;
+            case DataType.LONG:
+                res = Op.getNext(dummyLong);
+                break;
+            }
+			return res;
+		}
+	}
+
+	public class UDFSortComparator implements Comparator<Tuple> {
+
+		public int compare(Tuple t1, Tuple t2) {
+
+			mSortFunc.attachInput(t1, t2);
+			Integer i = null;
+			Result res = null;
+			try {
+				res = mSortFunc.getNext(i);
+			} catch (ExecException e) {
+
+				log.error("Input not ready. Error on reading from input. "
+						+ e.getMessage());
+			}
+			if (res != null)
+				return (Integer) res.result;
+			else
+				return 0;
+		}
+
+	}
+
+	@Override
+	public String name() {
+
+		return "POSort - " + mKey.toString();
+	}
+
+	@Override
+	public boolean isBlocking() {
+
+		return true;
+	}
+
+	@Override
+	public Result getNext(Tuple t) throws ExecException {
+		Result res = new Result();
+		if (!inputsAccumulated) {
+			res = processInput();
+			while (res.returnStatus != POStatus.STATUS_EOP) {
+				if (res.returnStatus == POStatus.STATUS_ERR) {
+					log.error("Error in reading from the inputs");
+					continue;
+				} else if (res.returnStatus == POStatus.STATUS_NULL) {
+					continue;
+				}
+				sortedBag.add((Tuple) res.result);
+				res = processInput();
+
+			}
+
+			inputsAccumulated = true;
+
+		}
+		if (it == null) {
+			it = sortedBag.iterator();
+		}
+		res.result = it.next();
+		if (res.result == null)
+			res.returnStatus = POStatus.STATUS_EOP;
+		else
+			res.returnStatus = POStatus.STATUS_OK;
+		return res;
+	}
+
+	@Override
+	public boolean supportsMultipleInputs() {
+
+		return false;
+	}
+
+	@Override
+	public boolean supportsMultipleOutputs() {
+
+		return false;
+	}
+
+	@Override
+	public void visit(PhyPlanVisitor v) throws VisitorException {
+
+		v.visitSort(this);
+	}
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserComparisonFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserComparisonFunc.java?rev=654303&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserComparisonFunc.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserComparisonFunc.java Wed May  7 15:17:03 2008
@@ -0,0 +1,124 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ComparisonFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+
+public class POUserComparisonFunc extends POUserFunc {
+
+	transient ComparisonFunc func;
+	private Log log = LogFactory.getLog(getClass());
+	
+	public POUserComparisonFunc(OperatorKey k, int rp, List inp, String funcSpec, ComparisonFunc func) {
+		super(k, rp, inp);
+		this.funcSpec = funcSpec;
+		this.func = func;		
+	}
+	
+	public POUserComparisonFunc(OperatorKey k, int rp, List inp, String funcSpec) {
+		this(k, rp, inp, funcSpec, null);
+		
+		instantiateFunc();
+	}
+	
+	private void instantiateFunc() {
+		this.func = (ComparisonFunc) PigContext.instantiateFuncFromSpec(this.funcSpec);
+	}
+	
+	public ComparisonFunc getComparator() {
+		if (func == null)
+			instantiateFunc();
+		return func;
+	}
+	
+	@Override
+	public Result getNext(Integer i) throws ExecException {
+		Result result = new Result();
+
+		if (func == null)
+			instantiateFunc();
+
+
+		result.result = func.compare(t1, t2);
+		result.returnStatus = (t1 != null && t2 != null) ? POStatus.STATUS_OK
+				: POStatus.STATUS_ERR;
+		// the two attached tuples are used up now. So we set the
+		// inputAttached flag to false
+		inputAttached = false;
+		return result;
+
+	}
+	
+	private Result getNext() {
+		Result res = null;
+		log.error("getNext being called with non-integer");
+		return res;
+	}
+	
+	@Override
+	public Result getNext(Boolean b) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(DataBag db) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(DataByteArray ba) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Double d) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Float f) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Long l) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Map m) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(String s) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Tuple in) throws ExecException {
+		return getNext();
+	}
+
+	public void attachInput(Tuple t1, Tuple t2) {
+		if (func == null)
+			instantiateFunc();
+
+		this.t1 = t1;
+		this.t2 = t2;
+		inputAttached = true;
+
+	}
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserFunc.java?rev=654303&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserFunc.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserFunc.java Wed May  7 15:17:03 2008
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POUserFunc extends PhysicalOperator<PhyPlanVisitor> {
+
+	transient EvalFunc func;
+	Tuple t1, t2;
+	private final Log log = LogFactory.getLog(getClass());
+	String funcSpec;
+	private final byte INITIAL = 0;
+	private final byte INTERMEDIATE = 1;
+	private final byte FINAL = 2;
+
+	public POUserFunc(OperatorKey k, int rp, List inp) {
+		super(k, rp);
+		inputs = inp;
+
+	}
+
+	public POUserFunc(OperatorKey k, int rp, List inp, String funcSpec) {
+		this(k, rp, inp, funcSpec, null);
+
+		instantiateFunc();
+	}
+	
+	public POUserFunc(OperatorKey k, int rp, List inp, String funcSpec, EvalFunc func) {
+		super(k, rp, inp);
+		this.funcSpec = funcSpec;
+		this.func = func;
+
+	}
+
+	private void instantiateFunc() {
+		this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(this.funcSpec);
+	}
+
+	private Result getNext() throws ExecException {
+		Tuple t = null;
+		Result result = new Result();
+		// instantiate the function if its null
+		if (func == null)
+			instantiateFunc();
+
+		try {
+			if (inputAttached) {
+				result.result = func.exec(input);
+				result.returnStatus = (result.result != null) ? POStatus.STATUS_OK
+						: POStatus.STATUS_EOP;
+				return result;
+			} else {
+				Result in = inputs.get(0).getNext(t);
+				if (in.returnStatus == POStatus.STATUS_EOP) {
+					result.returnStatus = POStatus.STATUS_EOP;
+					return result;
+				}
+				result.result = func.exec((Tuple) in.result);
+				result.returnStatus = POStatus.STATUS_OK;
+				return result;
+			}
+		} catch (IOException e) {
+			log.error(e);
+			//throw new ExecException(e.getCause());
+		}
+		result.returnStatus = POStatus.STATUS_ERR;
+		return result;
+	}
+
+	@Override
+	public Result getNext(Tuple tIn) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(DataBag db) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Integer i) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Boolean b) throws ExecException {
+
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(DataByteArray ba) throws ExecException {
+
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Double d) throws ExecException {
+
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Float f) throws ExecException {
+
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Long l) throws ExecException {
+
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Map m) throws ExecException {
+
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(String s) throws ExecException {
+
+		return getNext();
+	}
+
+	public void setAlgebraicFunction(Byte Function) {
+		// This will only be used by the optimizer for putting correct functions
+		// in the mapper,
+		// combiner and reduce. This helps in maintaining the physical plan as
+		// is without the
+		// optimiser having to replace any operators.
+		// You wouldn't be able to make two calls to this function on the same
+		// algebraic EvalFunc as
+		// func is being changed.
+		switch (Function) {
+		case INITIAL:
+			func = (EvalFunc) PigContext.instantiateFuncFromSpec(getInitial());
+			setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+			break;
+		case INTERMEDIATE:
+			func = (EvalFunc) PigContext.instantiateFuncFromSpec(getIntermed());
+			setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+			break;
+		case FINAL:
+			func = (EvalFunc) PigContext.instantiateFuncFromSpec(getFinal());
+			setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+			break;
+
+		}
+	}
+
+	public String getInitial() {
+		if (func == null)
+			instantiateFunc();
+
+		if (func instanceof Algebraic) {
+			return ((Algebraic) func).getInitial();
+		} else {
+			log
+					.error("Attempt to run a non-algebraic function as an algebraic function");
+		}
+		return null;
+	}
+
+	public String getIntermed() {
+		if (func == null)
+			instantiateFunc();
+
+		if (func instanceof Algebraic) {
+			return ((Algebraic) func).getIntermed();
+		} else {
+			log
+					.error("Attempt to run a non-algebraic function as an algebraic function");
+		}
+		return null;
+	}
+
+	public String getFinal() {
+		if (func == null)
+			instantiateFunc();
+
+		if (func instanceof Algebraic) {
+			return ((Algebraic) func).getFinal();
+		} else {
+			log
+					.error("Attempt to run a non-algebraic function as an algebraic function");
+		}
+		return null;
+	}
+
+	public Type getReturnType() {
+		if (func == null)
+			instantiateFunc();
+
+		return func.getReturnType();
+	}
+
+	public void finish() {
+		if (func == null)
+			instantiateFunc();
+
+		func.finish();
+	}
+
+	public Schema outputSchema(Schema input) {
+		if (func == null)
+			instantiateFunc();
+
+		return func.outputSchema(input);
+	}
+
+	public Boolean isAsynchronous() {
+		if (func == null)
+			instantiateFunc();
+		
+		return func.isAsynchronous();
+	}
+
+	@Override
+	public String name() {
+
+		return "POUserFunc - " + mKey.toString();
+	}
+
+	@Override
+	public boolean supportsMultipleInputs() {
+
+		return false;
+	}
+
+	@Override
+	public boolean supportsMultipleOutputs() {
+
+		return false;
+	}
+
+	@Override
+	public void visit(PhyPlanVisitor v) throws VisitorException {
+
+		v.visitUserFunc(this);
+	}
+
+}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java Wed May  7 15:17:03 2008
@@ -34,7 +34,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestConstExpr {
+public class TestConstExpr extends junit.framework.TestCase {
     Random r = new Random();
     ConstantExpression ce = (ConstantExpression) GenPhyOp.exprConst();
     

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEqualTo.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEqualTo.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEqualTo.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEqualTo.java Wed May  7 15:17:03 2008
@@ -36,7 +36,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestEqualTo {
+public class TestEqualTo extends junit.framework.TestCase {
 
     @Before
     public void setUp() throws Exception {

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java Wed May  7 15:17:03 2008
@@ -42,7 +42,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestFilter {
+public class TestFilter extends junit.framework.TestCase {
     POFilter pass;
 
     POFilter fail;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java Wed May  7 15:17:03 2008
@@ -52,7 +52,7 @@
  * as those in the projected bag 
  *
  */
-public class TestForEach {
+public class TestForEach extends junit.framework.TestCase {
     
     POForEach fe;
     Tuple t;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java Wed May  7 15:17:03 2008
@@ -37,7 +37,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestGTOrEqual {
+public class TestGTOrEqual extends junit.framework.TestCase {
 
     @Before
     public void setUp() throws Exception {

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java Wed May  7 15:17:03 2008
@@ -37,7 +37,7 @@
 
 import static org.junit.Assert.*;
 
-public class TestGreaterThan {
+public class TestGreaterThan extends junit.framework.TestCase {
 
     @Before
     public void setUp() throws Exception {

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java Wed May  7 15:17:03 2008
@@ -37,7 +37,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestLTOrEqual {
+public class TestLTOrEqual extends junit.framework.TestCase {
 
     @Before
     public void setUp() throws Exception {

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java Wed May  7 15:17:03 2008
@@ -37,7 +37,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestLessThan {
+public class TestLessThan extends junit.framework.TestCase {
 
     @Before
     public void setUp() throws Exception {

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java Wed May  7 15:17:03 2008
@@ -41,7 +41,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestLoad {
+public class TestLoad extends junit.framework.TestCase {
     FileSpec inpFSpec;
     POLoad ld;
     PigContext pc;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java Wed May  7 15:17:03 2008
@@ -47,7 +47,7 @@
  * group db by $0 
  *
  */
-public class TestLocalRearrange {
+public class TestLocalRearrange extends junit.framework.TestCase {
     
     POLocalRearrange lr;
     Tuple t;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestNotEqualTo.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestNotEqualTo.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestNotEqualTo.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestNotEqualTo.java Wed May  7 15:17:03 2008
@@ -36,7 +36,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestNotEqualTo {
+public class TestNotEqualTo extends junit.framework.TestCase {
 
     @Before
     public void setUp() throws Exception {

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java?rev=654303&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java Wed May  7 15:17:03 2008
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PODistinct;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PORead;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPODistinct extends TestCase {
+	DataBag input = BagFactory.getInstance().newDefaultBag();
+	Random r = new Random();
+	final int MAX_VALUE = 10;
+	final int MAX_SAMPLES = 100;
+
+	@Before
+	public void setUp() {
+		TupleFactory tf = TupleFactory.getInstance();
+		for (int i = 0; i < MAX_SAMPLES; i++) {
+			Tuple t = tf.newTuple();
+			t.append(r.nextInt(MAX_VALUE));
+			input.add(t);
+			// System.out.println(t);
+		}
+		// System.out.println();
+	}
+
+	@Test
+	public void testPODistict() throws ExecException {
+		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+		inputs.add(read);
+		PODistinct distinct = new PODistinct(new OperatorKey("", r.nextLong()),
+				-1, inputs);
+		Map<Tuple, Integer> output = new HashMap<Tuple, Integer>();
+		Tuple t = null;
+		Result res = distinct.getNext(t);
+		t = (Tuple) res.result;
+		while (res.returnStatus != POStatus.STATUS_EOP) {
+			if (output.containsKey(t)) {
+				int i = output.get(t);
+				output.put(t, ++i);
+			} else {
+				output.put(t, 1);
+			}
+			res = distinct.getNext(t);
+			t = (Tuple) res.result;
+		}
+		for (Map.Entry<Tuple, Integer> e : output.entrySet()) {
+			int i = e.getValue();
+			// System.out.println(e.getKey());
+			assertEquals(1, i);
+		}
+	}
+
+}

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java?rev=654303&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java Wed May  7 15:17:03 2008
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ComparisonFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PORead;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSort;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUserComparisonFunc;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUserFunc;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.test.utils.GenRandomData;
+import org.junit.Test;
+
+public class TestPOSort extends TestCase {
+	Random r = new Random();
+	int MAX_TUPLES = 10;
+
+	@Test
+	public void testPOSortAscString() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+				MAX_TUPLES, 100);
+		List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+		POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+		pr1.setResultType(DataType.CHARARRAY);
+		ExprPlan expPlan = new ExprPlan();
+		expPlan.add(pr1);
+		sortPlans.add(expPlan);
+		List<Boolean> mAscCols = new LinkedList<Boolean>();
+		mAscCols.add(true);
+		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+		inputs.add(read);
+		POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+				sortPlans, mAscCols, null);
+		Tuple t = null;
+		Result res1 = sort.getNext(t);
+		// System.out.println(res1.result);
+		Result res2 = sort.getNext(t);
+		while (res2.returnStatus != POStatus.STATUS_EOP) {
+			Object i1 = ((Tuple) res1.result).get(0);
+			Object i2 = ((Tuple) res2.result).get(0);
+			int i = DataType.compare(i1, i2);
+			// System.out.println(res2.result + " i = " + i);
+			assertEquals(true, (i <= 0));
+			res1 = res2;
+			res2 = sort.getNext(t);
+		}
+	}
+
+	@Test
+	public void testPOSortDescString() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+				MAX_TUPLES, 100);
+		List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+		POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+		pr1.setResultType(DataType.CHARARRAY);
+		ExprPlan expPlan = new ExprPlan();
+		expPlan.add(pr1);
+		sortPlans.add(expPlan);
+		List<Boolean> mAscCols = new LinkedList<Boolean>();
+		mAscCols.add(false);
+		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+		inputs.add(read);
+		POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+				sortPlans, mAscCols, null);
+		Tuple t = null;
+		Result res1 = sort.getNext(t);
+		// System.out.println(res1.result);
+		Result res2 = sort.getNext(t);
+		while (res2.returnStatus != POStatus.STATUS_EOP) {
+			Object i1 = ((Tuple) res1.result).get(0);
+			Object i2 = ((Tuple) res2.result).get(0);
+			int i = DataType.compare(i1, i2);
+			// System.out.println(res2.result + " i = " + i);
+			assertEquals(true, (i >= 0));
+			res1 = res2;
+			res2 = sort.getNext(t);
+		}
+	}
+
+	@Test
+	public void testPOSortAsc() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+				MAX_TUPLES, 100);
+		List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+		POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+		pr1.setResultType(DataType.INTEGER);
+		ExprPlan expPlan = new ExprPlan();
+		expPlan.add(pr1);
+		sortPlans.add(expPlan);
+		List<Boolean> mAscCols = new LinkedList<Boolean>();
+		mAscCols.add(true);
+		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+		inputs.add(read);
+		POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+				sortPlans, mAscCols, null);
+		Tuple t = null;
+		Result res1 = sort.getNext(t);
+		// System.out.println(res1.result);
+		Result res2 = sort.getNext(t);
+		while (res2.returnStatus != POStatus.STATUS_EOP) {
+			Object i1 = ((Tuple) res1.result).get(1);
+			Object i2 = ((Tuple) res2.result).get(1);
+			int i = DataType.compare(i1, i2);
+			assertEquals(true, (i <= 0));
+			// System.out.println(res2.result);
+			res1 = res2;
+			res2 = sort.getNext(t);
+		}
+	}
+
+	@Test
+	public void testPOSortDesc() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+				MAX_TUPLES, 100);
+		List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+		POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+		pr1.setResultType(DataType.INTEGER);
+		ExprPlan expPlan = new ExprPlan();
+		expPlan.add(pr1);
+		sortPlans.add(expPlan);
+		List<Boolean> mAscCols = new LinkedList<Boolean>();
+		mAscCols.add(false);
+		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+		inputs.add(read);
+		POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+				sortPlans, mAscCols, null);
+		Tuple t = null;
+		Result res1 = sort.getNext(t);
+		// System.out.println(res1.result);
+		Result res2 = sort.getNext(t);
+		while (res2.returnStatus != POStatus.STATUS_EOP) {
+			Object i1 = ((Tuple) res1.result).get(1);
+			Object i2 = ((Tuple) res2.result).get(1);
+			int i = DataType.compare(i1, i2);
+			assertEquals(true, (i >= 0));
+			// System.out.println(res2.result);
+			res1 = res2;
+			res2 = sort.getNext(t);
+		}
+	}
+
+	@Test
+	public void testPOSortUDF() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+				MAX_TUPLES, 100);
+		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+		inputs.add(read);
+		String funcName = WeirdComparator.class.getName() + "()";
+		/*POUserFunc comparator = new POUserFunc(
+				new OperatorKey("", r.nextLong()), -1, inputs, funcName);*/
+		POUserFunc comparator = new POUserComparisonFunc(
+				new OperatorKey("", r.nextLong()), -1, null, funcName);
+		POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+				null, null, comparator);
+		Tuple t = null;
+		Result res1 = sort.getNext(t);
+		// System.out.println(res1.result);
+		Result res2 = sort.getNext(t);
+		while (res2.returnStatus != POStatus.STATUS_EOP) {
+			int i1 = (Integer) ((Tuple) res1.result).get(1);
+			int i2 = (Integer) ((Tuple) res2.result).get(1);
+			int i = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
+			assertEquals(true, (i <= 0));
+			System.out.println(i + " : " + res2.result);
+			res1 = res2;
+			res2 = sort.getNext(t);
+		}
+	}
+
+	// sorts values in ascending order of their distance from 50
+	public static class WeirdComparator extends ComparisonFunc {
+
+		@Override
+		public int compare(Tuple t1, Tuple t2) {
+			// TODO Auto-generated method stub
+			int result = 0;
+			try {
+				int i1 = (Integer) t1.get(1);
+				int i2 = (Integer) t2.get(1);
+				result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
+			} catch (ExecException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+			return result;
+		}
+
+	}
+}

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java?rev=654303&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java Wed May  7 15:17:03 2008
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.Algebraic;
+import org.apache.pig.ComparisonFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PORead;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUserComparisonFunc;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUserFunc;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.test.utils.GenRandomData;
+import org.junit.Test;
+
+public class TestPOUserFunc extends TestCase {
+	Random r = new Random();
+	int MAX_TUPLES = 10;
+
+	public static class ARITY extends EvalFunc<Integer> {
+
+		@Override
+		public Integer exec(Tuple input) throws IOException {
+			return new Integer(input.size());
+		}
+
+		@Override
+		public Schema outputSchema(Schema input) {
+			// TODO FIX
+			// return new AtomSchema("arity");
+			return null;
+		}
+	}
+
+	public static class WeirdComparator extends ComparisonFunc {
+
+		@Override
+		public int compare(Tuple t1, Tuple t2) {
+			// TODO Auto-generated method stub
+			Object o1 = null;
+			Object o2 = null;
+			try {
+				o1 = t1.get(2);
+				o2 = t2.get(2);
+			} catch (ExecException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+			int i1 = (Integer) o1 - 2;
+			int i2 = (Integer) o2 - 2;
+
+			return (int) (i1 * i1 - i2 * i2);
+		}
+
+	}
+
+	/**
+	 * Generates the average of the values of the first field of a tuple. This
+	 * class is Algebraic in implemenation, so if possible the execution will be
+	 * split into a local and global application
+	 */
+	public static class AVG extends EvalFunc<Double> implements Algebraic {
+
+		private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+		@Override
+		public Double exec(Tuple input) throws IOException {
+			double sum = 0;
+			double count = 0;
+			
+			try {
+				sum = sum(input);
+				count = count(input);
+			} catch (ExecException e) {
+				e.printStackTrace();
+			}
+
+			double avg = 0;
+			if (count > 0)
+				avg = sum / count;
+
+			return new Double(avg);
+		}
+
+		public String getInitial() {
+			return Initial.class.getName();
+		}
+
+		public String getIntermed() {
+			return Intermed.class.getName();
+		}
+
+		public String getFinal() {
+			return Final.class.getName();
+		}
+
+		static public class Initial extends EvalFunc<Tuple> {
+			@Override
+			public Tuple exec(Tuple input) throws IOException {
+				try {
+					Tuple t = mTupleFactory.newTuple(2);
+					t.set(0, new Double(sum(input)));
+					t.set(1, new Long(count(input)));
+					return t;
+				} catch (ExecException t) {
+					throw new RuntimeException(t.getMessage() + ": " + input);
+				}
+			}
+		}
+
+		static public class Intermed extends EvalFunc<Tuple> {
+			@Override
+			public Tuple exec(Tuple input) throws IOException {
+				DataBag b = null;
+				Tuple t = null;
+				try {
+					b = (DataBag) input.get(0);
+					t = combine(b);
+				} catch (ExecException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+				return t;
+			}
+		}
+
+		static public class Final extends EvalFunc<Double> {
+			@Override
+			public Double exec(Tuple input) throws IOException {
+				double sum = 0;
+				double count = 0;
+				try {
+					DataBag b = (DataBag) input.get(0);
+					Tuple combined = combine(b);
+
+					sum = (Double) combined.get(0);
+					count = (Long) combined.get(1);
+				} catch (ExecException e) {
+					e.printStackTrace();
+				}
+
+				double avg = 0;
+				if (count > 0) {
+					avg = sum / count;
+				}
+				return new Double(avg);
+			}
+		}
+
+		static protected Tuple combine(DataBag values) throws ExecException {
+			double sum = 0;
+			long count = 0;
+
+			Tuple output = mTupleFactory.newTuple(2);
+
+			for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+				Tuple t = it.next();
+				sum += (Double) t.get(0);
+				count += (Long) t.get(1);
+			}
+
+			output.set(0, new Double(sum));
+			output.set(1, new Long(count));
+			return output;
+		}
+
+		static protected long count(Tuple input) throws ExecException {
+			DataBag values = (DataBag) input.get(0);
+			return values.size();
+		}
+
+		static protected double sum(Tuple input) throws ExecException {
+			DataBag values = (DataBag) input.get(0);
+
+			double sum = 0;
+			for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+				Tuple t = it.next();
+				Double d = DataType.toDouble(t.get(0));
+				if (d == null)
+					continue;
+				sum += d;
+			}
+
+			return sum;
+		}
+
+		@Override
+		public Schema outputSchema(Schema input) {
+			// TODO FIX
+			// return new AtomSchema("average");
+			return null;
+		}
+
+	}
+
+	@Test
+	public void testUserFuncArity() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+				MAX_TUPLES, 100);
+		String funcSpec = ARITY.class.getName() + "()";
+		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+		inputs.add(read);
+		POUserFunc userFunc = new POUserFunc(new OperatorKey("", r.nextLong()),
+				-1, inputs, funcSpec);
+		Result res = new Result();
+		Integer i = null;
+		res = userFunc.getNext(i);
+		while (res.returnStatus != POStatus.STATUS_EOP) {
+			// System.out.println(res.result);
+			int result = (Integer) res.result;
+			assertEquals(2, result);
+			res = userFunc.getNext(i);
+		}
+	}
+
+	@Test
+	public void testUDFCompare() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, 2,
+				100);
+		String funcSpec = WeirdComparator.class.getName() + "()";
+		POUserFunc userFunc = new POUserComparisonFunc(new OperatorKey("", r.nextLong()),
+				-1, null, funcSpec);
+		Iterator<Tuple> it = input.iterator();
+		Tuple t1 = it.next();
+		Tuple t2 = it.next();
+		t1.append(2);
+		t2.append(3);
+		((POUserComparisonFunc)userFunc).attachInput(t1, t2);
+		Integer i = null;
+		// System.out.println(t1 + " " + t2);
+		int result = (Integer) (userFunc.getNext(i).result);
+		assertEquals(-1, result);
+	}
+
+	@Test
+	public void testAlgebraicAVG() throws IOException, ExecException {
+		int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+		byte INIT = 0;
+		byte INTERMED = 1;
+		byte FINAL = 2;
+		Tuple tup1 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1),
+				input);
+		Tuple tup2 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1),
+				input);
+		// System.out.println("Input = " + tup1);
+		String funcSpec = AVG.class.getName() + "()";
+
+		POUserFunc po = new POUserFunc(new OperatorKey("", r.nextLong()), -1,
+				null, funcSpec);
+
+		TupleFactory tf = TupleFactory.getInstance();
+
+		po.setAlgebraicFunction(INIT);
+		po.attachInput(tup1);
+		Tuple t = null;
+		Result res = po.getNext(t);
+		Tuple outputInitial1 = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
+				: null;
+		Tuple outputInitial2 = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
+				: null;
+		System.out.println(outputInitial1 + " " + outputInitial2);
+		assertEquals(outputInitial1, outputInitial2);
+		double sum = (Double) outputInitial1.get(0);
+		long count = (Long) outputInitial1.get(1);
+		assertEquals(55.0, sum);
+		assertEquals(10, count);
+		DataBag bag = BagFactory.getInstance().newDefaultBag();
+		bag.add(outputInitial1);
+		bag.add(outputInitial2);
+		Tuple outputInitial = tf.newTuple();
+		outputInitial.append(bag);
+		// Tuple outputIntermed = intermed.exec(outputInitial);
+		po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
+				funcSpec);
+		po.setAlgebraicFunction(INTERMED);
+		po.attachInput(outputInitial);
+		res = po.getNext(t);
+		Tuple outputIntermed = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
+				: null;
+
+		sum = (Double) outputIntermed.get(0);
+		count = (Long) outputIntermed.get(1);
+		assertEquals(110.0, sum);
+		assertEquals(20, count);
+		System.out.println(outputIntermed);
+		po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
+				funcSpec);
+		po.setAlgebraicFunction(FINAL);
+		po.attachInput(outputInitial);
+		res = po.getNext(t);
+		Double output = (res.returnStatus == POStatus.STATUS_OK) ? (Double) res.result
+				: null;
+		// Double output = fin.exec(outputInitial);
+		assertEquals(5.5, output);
+		// System.out.println("output = " + output);
+
+	}
+}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java Wed May  7 15:17:03 2008
@@ -39,7 +39,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestPackage {
+public class TestPackage extends junit.framework.TestCase {
 
     @Before
     public void setUp() throws Exception {

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java Wed May  7 15:17:03 2008
@@ -37,7 +37,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestPhyOp {
+public class TestPhyOp extends junit.framework.TestCase {
     PhysicalOperator<PhyPlanVisitor> op;
 
     PhysicalOperator<PhyPlanVisitor> inpOp;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java Wed May  7 15:17:03 2008
@@ -37,7 +37,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestProject {
+public class TestProject extends  junit.framework.TestCase {
     Random r;
 
     Tuple t;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java Wed May  7 15:17:03 2008
@@ -47,7 +47,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestStore {
+public class TestStore extends junit.framework.TestCase {
     POStore st;
     FileSpec fSpec;
     DataBag inpDB;