You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/03/31 20:33:14 UTC

svn commit: r643092 - in /incubator/pig/branches/types: ./ src/org/apache/pig/impl/physicalLayer/ src/org/apache/pig/impl/physicalLayer/plans/ src/org/apache/pig/impl/physicalLayer/topLevelOperators/ src/org/apache/pig/impl/physicalLayer/topLevelOperat...

Author: gates
Date: Mon Mar 31 11:33:08 2008
New Revision: 643092

URL: http://svn.apache.org/viewvc?rev=643092&view=rev
Log:
Checked in Shravan's first incr1 patch, which contains the first version of a few of the new physical operators.


Added:
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStatus.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POFilter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ConstantExpression.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ExpressionOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/BinaryExpressionOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/ComparisonOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/GreaterThanExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/util/
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/util/operatorHelper.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/
    incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java
Modified:
    incubator/pig/branches/types/build.xml
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java

Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=643092&r1=643091&r2=643092&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Mon Mar 31 11:33:08 2008
@@ -132,7 +132,16 @@
     </target>
 
     <target name="compile-sources">
-        <javac encoding="${build.encoding}" srcdir="${sources}" includes="**/plan/*.java, **/data/*.java, **/pig/builtin/*.java, **/test/TestOperatorPlan.java, **/test/TestBuiltin.java, **/logicalLayer/*.java, **/logicalLayer/schema/*.java  " destdir="${dist}" debug="${javac.debug}" optimize="${javac.optimize}" target="${javac.version}" source="${javac.version}" deprecation="${javac.deprecation}">
+        <javac encoding="${build.encoding}" srcdir="${sources}"
+            includes="**/plan/*.java, **/data/*.java, **/pig/builtin/*.java,
+                **/test/TestOperatorPlan.java, **/test/TestBuiltin.java,
+                **/test/TestConstExpr.java, **/test/TestFilter.java, **/test/TestPhyOp.java,
+                **/test/TestProject.java, **/test/utils/*.java, **/logicalLayer/*.java,
+                **/logicalLayer/schema/*.java, **/physicalLayer/topLevelOperators/*.java,
+                **/physicalLayer/topLevelOperator/**/*.java, **/physicalLayer/plans/*.java,
+                **/physicalLayer/util/*.java, **/physicalLayer/Result.java,
+                **/physicalLayer/POStatus.java"
+            destdir="${dist}" debug="${javac.debug}" optimize="${javac.optimize}" target="${javac.version}" source="${javac.version}" deprecation="${javac.deprecation}">
             <compilerarg line="${javac.args} ${javac.args.warnings}" />
             <classpath refid="${cp}" />
         </javac>
@@ -216,6 +225,10 @@
                 <fileset dir="test">
                     <include name="**/TestBuiltin.java" />
                     <include name="**/TestOperatorPlan.java" />
+                	<include name="**/TestPhyOp.java" />
+                	<include name="**/TestConstExpr.java" />
+                	<include name="**/TestProject.java" />
+                	<include name="**/TestFilter.java" />
                     <!--
                     <include name="**/*Test*.java" />
                     <exclude name="**/TestLargeFile.java" />

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStatus.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStatus.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStatus.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStatus.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,10 @@
+package org.apache.pig.impl.physicalLayer;
+
+public class POStatus {
+	public static final byte STATUS_OK = 0;
+	public static final byte STATUS_NULL = 1;
+	public static final byte STATUS_ERR = 2;
+	public static final byte STATUS_EOP = 3; // end of processing 
+	
+	public static Object result;
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,17 @@
+package org.apache.pig.impl.physicalLayer;
+
+import java.io.Serializable;
+
+public class Result implements Serializable{
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+	public byte returnStatus;
+	public Object result;
+	
+	public Result(){
+		returnStatus = POStatus.STATUS_ERR;
+		result = null;
+	}
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlan.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlan.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlan.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,16 @@
+package org.apache.pig.impl.physicalLayer.plans;
+
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+
+/**
+ * A plan that supports expressions by combining the
+ * expression operators. This will be used by top level
+ * operators like filter and foreach that have Expression Plans
+ * as their attribute plan.
+ *
+ */
+public class ExprPlan extends PhysicalPlan<ExpressionOperator> {
+	public ExprPlan() {
+		super();
+	}
+}
\ No newline at end of file

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,63 @@
+package org.apache.pig.impl.physicalLayer.plans;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GreaterThanExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.LTOrEqualToExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.LessThanExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.NotEqualToExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.NotEqualToExpr;
+
+/**
+ * The visitor to be used for visiting expression plans.
+ * Create the visitor with the ExpressionPlan to be visited.
+ * Call the visit() method for a depth first traversal.
+ *
+ */
+public abstract class ExprPlanVisitor extends PhyPlanVisitor<ExpressionOperator, ExprPlan> {
+
+	private final Log log = LogFactory.getLog(getClass());
+	
+	public ExprPlanVisitor(ExprPlan plan) {
+		super(plan);
+		// TODO Auto-generated constructor stub
+	}
+	
+	public void visitConstant(ConstantExpression cnst){
+		//do nothing
+	}
+	
+	public void visitProject(POProject proj){
+		//do nothing
+	}
+	
+	public void visitGreaterThan(GreaterThanExpr grt){
+		//do nothing
+	}
+	
+	/*public void visitLessThan(LessThanExpr lt){
+		//do nothing
+	}
+	
+	public void visitGTOrEqual(GTOrEqualToExpr gte){
+		//do nothing
+	}
+	
+	public void visiLTOrEqual(LTOrEqualToExpr lte){
+		//do nothing
+	}
+	
+	public void visitEqualTo(EqualToExpr eq){
+		//do nothing
+	}
+	
+	public void visitNotEqualTo(NotEqualToExpr eq){
+		//do nothing
+	}*/
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,67 @@
+package org.apache.pig.impl.physicalLayer.plans;
+
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
+import org.apache.pig.impl.plan.PlanVisitor;
+
+/**
+ * The visitor class for the Physical Plan. To use this,
+ * create the visitor with the plan to be visited. Call 
+ * the visit() method to traverse the plan in a depth first
+ * fashion.
+ * 
+ * This class can be used to visit only the top level 
+ * relational operators. Classes extending this should
+ * use the specific plans to visit the attribute plans
+ * of each of the top level operators.
+ *
+ * @param <O>
+ * @param <P>
+ */
+public abstract class PhyPlanVisitor<O extends PhysicalOperator, P extends PhysicalPlan<O>> extends PlanVisitor<O,P> {
+
+	public PhyPlanVisitor(P plan) {
+		super(plan);
+	}
+
+	@Override
+	public void visit() throws ParseException {
+		depthFirst();
+	}
+	
+//	public void visitLoad(POLoad ld){
+//		//do nothing
+//	}
+//	
+//	public void visitStore(POStore st){
+//		//do nothing
+//	}
+//	
+	public void visitFilter(POFilter fl){
+		//do nothing
+	}
+//	
+//	public void visitLocalRearrange(POLocalRearrange lr){
+//		//do nothing
+//	}
+//	
+//	public void visitGlobalRearrange(POGlobalRearrange gr){
+//		//do nothing
+//	}
+//	
+//	public void visitStartMap(StartMap sm){
+//		//do nothing
+//	}
+//	
+//	public void visitPackage(POPackage pkg){
+//		//do nothing
+//	}
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,38 @@
+package org.apache.pig.impl.physicalLayer.plans;
+
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.plan.OperatorPlan;
+
+/**
+ * 
+ * The base class for all types of physical plans. 
+ * This extends the Operator Plan.
+ *
+ * @param <E>
+ */
+public abstract class PhysicalPlan<E extends PhysicalOperator> extends OperatorPlan<E> {
+
+	public PhysicalPlan() {
+		super();
+	}
+	
+	public void attachInput(Tuple t){
+		List<E> roots = getRoots();
+		for (E operator : roots)
+			operator.attachInput(t);
+	}
+	
+	/**
+	 * Write a visual representation of the Physical Plan
+	 * into the given output stream
+	 * @param out : OutputStream to which the visual representation is written
+	 */
+	public void explain(OutputStream out){
+		//Use a plan visitor and output the current physical
+		//plan into out
+	}
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POFilter.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POFilter.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POFilter.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,162 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.ComparisonOperator;
+
+
+/**
+ * This is an implementation of the Filter operator. It has
+ * an Expression Plan that decides whether the input tuple
+ * should be filtered or passed through. To avoid many function
+ * calls, the filter operator, stores the Comparison Operator
+ * that is the root of the Expression Plan and uses its getNext
+ * directly.
+ * 
+ * Since the filter is supposed to return tuples only, getNext
+ * is not supported on any other data type.
+ *
+ */
+public class POFilter extends PhysicalOperator<PhyPlanVisitor> {
+
+	private Log log = LogFactory.getLog(getClass());
+	
+	//The expression plan
+	ExprPlan plan;
+	
+	//The root comparison operator of the expression plan
+	ComparisonOperator comOp;
+	
+	//The operand type for the comparison operator needed
+	//to call the comparison operators getNext with the 
+	//appropriate type
+	byte compOperandType;
+	
+	//Dummy types used to access the getNext of appropriate
+	//type
+	DataByteArray ba;
+	String s;
+	Double d;
+	Float f;
+	Integer i;
+	Long l;
+	
+
+	public POFilter(OperatorKey k) {
+		this(k, -1, null);
+	}
+
+	public POFilter(OperatorKey k, int rp) {
+		this(k, rp, null);
+	}
+	
+	public POFilter(OperatorKey k, List<PhysicalOperator<PhyPlanVisitor>> inputs){
+		this(k,-1,inputs);
+	}
+
+	public POFilter(OperatorKey k, int rp, List<PhysicalOperator<PhyPlanVisitor>> inputs) {
+		super(k, rp, inputs);
+	}
+	
+	/**
+	 * Attaches the proccesed input tuple to the expression plan
+	 * and checks if comparison operator returns a true. If so the
+	 * tuple is not filtered and let to pass through. Else, further
+	 * input is processed till a tuple that can be passed through is
+	 * found or EOP is reached.
+	 */
+	@Override
+	public Result getNext(Tuple t) throws ExecException {
+		Result res = null;
+		Result inp = null;
+		while(true){
+			inp = processInput();
+			if(inp.returnStatus==POStatus.STATUS_EOP)
+				break;
+			if(inp.returnStatus==POStatus.STATUS_NULL || inp.returnStatus==POStatus.STATUS_ERR){
+				log.warn("An erroneous/Null return status was returned by processInput(). Continuing with further input.");
+				continue;
+			}
+			
+			plan.attachInput((Tuple)inp.result);
+			
+			switch(compOperandType){
+			case DataType.BYTEARRAY:
+				res = comOp.getNext(ba);
+				if(res.returnStatus!=POStatus.STATUS_OK) continue;
+				break;
+			case DataType.CHARARRAY:
+				res = comOp.getNext(s);
+				if(res.returnStatus!=POStatus.STATUS_OK) continue;
+				break;
+			case DataType.DOUBLE:
+				res = comOp.getNext(d);
+				if(res.returnStatus!=POStatus.STATUS_OK) continue;
+				break;
+			case DataType.FLOAT:
+				res = comOp.getNext(f);
+				if(res.returnStatus!=POStatus.STATUS_OK) continue;
+				break;
+			case DataType.INTEGER:
+				res = comOp.getNext(i);
+				if(res.returnStatus!=POStatus.STATUS_OK) continue;
+				break;
+			case DataType.LONG:
+				res = comOp.getNext(l);
+				if(res.returnStatus!=POStatus.STATUS_OK) continue;
+				break;
+			}
+			
+			if(res==null){
+				return new Result();
+			}
+			if((Boolean)res.result==true){
+				return inp;
+			}
+		}
+		return inp;
+	}
+	
+	@Override
+	public String name() {
+		return "Filter - " + mKey.toString();
+	}
+
+	@Override
+	public boolean supportsMultipleInputs() {
+		return false;
+	}
+
+	@Override
+	public boolean supportsMultipleOutputs() {
+		return false;
+	}
+
+	@Override
+	public String typeName() {
+		return getClass().getName();
+	}
+
+	@Override
+	public void visit(PhyPlanVisitor v) throws ParseException {
+		v.visitFilter(this);
+	}
+
+	public void setPlan(ExprPlan plan) {
+		this.plan = plan;
+		comOp = (ComparisonOperator)(plan.getLeaves()).get(0);
+		compOperandType = comOp.getOperandType();
+	}
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,230 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.backend.executionengine.ExecException;
+
+/**
+ * 
+ * This is the base class for all operators. This supports a 
+ * generic way of processing inputs which can be overridden by 
+ * operators extending this class. The input model assumes that
+ * it can either be taken from an operator or can be attached 
+ * directly to this operator. Also it is assumed that inputs to an
+ * operator are always in the form of a tuple.
+ * 
+ * For this pipeline rework, we assume a pull based model, i.e, 
+ * the root operator is going to call getNext with the appropriate
+ * type which initiates a cascade of getNext calls that unroll to
+ * create input for the root operator to work on. 
+ * 
+ * Any operator that extends the PhysicalOperator, supports a getNext
+ * with all the different types of parameter types. The concrete implementation
+ * should use the result type of its input operator to decide the type of getNext's
+ * parameter. This is done to avoid switch/case based on the type
+ * as much as possible. The default is assumed to return an erroneus Result
+ * corresponding to an unsupported operation on that type. So the operators need
+ * to implement only those types that are supported.
+ *
+ * @param <V>
+ */
+public abstract class PhysicalOperator<V extends PhyPlanVisitor> extends Operator<V> {
+	
+	private Log log = LogFactory.getLog(getClass());
+	
+	static final long serialVersionUID = 1L;
+	
+	//The degree of parallelism requested
+	protected int requestedParallelism;
+	
+	//The inputs that this operator will read data from
+	protected List<PhysicalOperator<V>> inputs;
+	
+	//The outputs that this operator will write data to
+	//Will be used to create Targeted tuples
+	protected List<PhysicalOperator<V>> outputs;
+	
+	//The data type for the results of this operator
+	protected byte resultType = DataType.TUPLE;
+	
+	//Specifies if the input has been directly attached
+	protected boolean inputAttached = false;
+	
+	//If inputAttached is true, input is set to the input tuple
+	protected Tuple input = null;
+	
+	//The result of performing the operation along with the output
+	protected Result res = null;
+
+	public PhysicalOperator(OperatorKey k) {
+		this(k,-1,null);
+	}
+	
+	public PhysicalOperator(OperatorKey k, int rp){
+		this(k,rp,null);
+	}
+	
+	public PhysicalOperator(OperatorKey k, List<PhysicalOperator<V>> inp){
+		this(k,-1,inp);
+	}
+	
+	public PhysicalOperator(OperatorKey k, int rp, List<PhysicalOperator<V>> inp){
+		super(k);
+		requestedParallelism = rp;
+		inputs = inp;
+		res = new Result();
+	}
+
+	public int getRequestedParallelism() {
+		return requestedParallelism;
+	}
+
+	public void setRequestedParallelism(int requestedParallelism) {
+		this.requestedParallelism = requestedParallelism;
+	}
+	
+	public byte getResultType() {
+		return resultType;
+	}
+
+	public void setResultType(byte resultType) {
+		this.resultType = resultType;
+	}
+	
+	public List<PhysicalOperator<V>> getInputs() {
+		return inputs;
+	}
+	
+	public void setInputs(List<PhysicalOperator<V>> inputs) {
+		this.inputs = inputs;
+	}
+	
+	public boolean isInputAttached() {
+		return inputAttached;
+	}
+
+	public void setInputAttached(boolean inputAttached) {
+		this.inputAttached = inputAttached;
+	}
+	
+	/**
+	 * Shorts the input path of this operator by providing
+	 * the input tuple directly
+	 * @param t - The tuple that should be used as input
+	 */
+	public void attachInput(Tuple t){
+		input = t;
+		this.inputAttached = true;
+	}
+	
+	/**
+	 * Detaches any tuples that are attached
+	 *
+	 */
+	public void detachInput(){
+		input = null;
+		this.inputAttached = false;
+	}
+	
+	/**
+	 * A blocking operator should override this to return true.
+	 * Blocking operators are those that need the full bag before
+	 * operate on the tuples inside the bag. Example is the Global Rearrange.
+	 * Non-blocking or pipeline operators are those that work on
+	 * a tuple by tuple basis.
+	 * @return true if blocking and false otherwise
+	 */
+	public boolean isBlocking(){
+		return false;
+	}
+	
+	/**
+	 * A generic method for parsing input that either returns 
+	 * the attached input if it exists or fetches it from its
+	 * predecessor. If special processing is required, this 
+	 * method should be overridden.
+	 * @return The Result object that results from processing the
+	 * 			input
+	 * @throws ExecException
+	 */
+	public Result processInput() throws ExecException{
+		Result res = new Result();
+		Tuple inpValue = null;
+		if(input==null && inputs==null) {
+			log.warn("No inputs found. Signaling End of Processing.");
+			res.returnStatus = POStatus.STATUS_EOP;
+			return res;
+		}
+		if(!isInputAttached())
+			return inputs.get(0).getNext(inpValue);
+		else{
+			res.result = input;
+			res.returnStatus = POStatus.STATUS_OK;
+			detachInput();
+			return res;
+		}
+	}
+	
+	public abstract void visit(V v) throws ParseException ;
+
+	public Result getNext(Integer i) throws ExecException {
+		return res;
+	}
+	
+	
+	public Result getNext(Long l) throws ExecException {
+		return res;
+	}
+
+	
+	public Result getNext(Double d) throws ExecException {
+		return res;
+	}
+
+	
+	public Result getNext(Float f) throws ExecException {
+		return res;
+	}
+
+	
+	public Result getNext(String s) throws ExecException {
+		return res;
+	}
+
+	
+	public Result getNext(DataByteArray ba) throws ExecException {
+		return res;
+	}
+	
+	public Result getNext(Map m) throws ExecException{
+		return res;
+	}
+	
+	public Result getNext(Boolean b) throws ExecException{
+		return res;
+	}
+
+	
+	public Result getNext(Tuple t) throws ExecException {
+		return res;
+	}
+
+	
+	public Result getNext(DataBag db) throws ExecException {
+		return res;
+	}
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ConstantExpression.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ConstantExpression.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ConstantExpression.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ConstantExpression.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,155 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.backend.executionengine.ExecException;
+
+
+/**
+ * This class implements a Constant of any type.
+ * Its value can be set using the setValue method. 
+ *
+ */
+public class ConstantExpression extends ExpressionOperator {
+	
+	private Log log = LogFactory.getLog(getClass());
+	
+	//The value that this constant represents
+	Object value;
+	
+	//The result of calling getNext
+	Result res = new Result();
+
+	public ConstantExpression(OperatorKey k) {
+		this(k,-1);
+	}
+
+	public ConstantExpression(OperatorKey k, int rp) {
+		super(k, rp);
+	}
+
+	@Override
+	public String name() {
+		return "Constant(" + value.toString() +") - " + mKey.toString();
+	}
+
+	@Override
+	public boolean supportsMultipleInputs() {
+		return false;
+	}
+
+	@Override
+	public boolean supportsMultipleOutputs() {
+		return false;
+	}
+
+	@Override
+	public String typeName() {
+		return getClass().getName();
+	}
+
+	@Override
+	public void visit(ExprPlanVisitor v) throws ParseException {
+		v.visitConstant(this);
+	}
+
+	public Object getValue() {
+		return value;
+	}
+
+	public void setValue(Object value) {
+		this.value = value;
+	}
+
+	@Override
+	public Result getNext(DataBag db) throws ExecException {
+		
+		res.returnStatus = POStatus.STATUS_OK;
+		res.result = (DataBag)value;
+		return res;
+	}
+
+	@Override
+	public Result getNext(DataByteArray ba) throws ExecException {
+		
+		res.returnStatus = POStatus.STATUS_OK;
+		res.result = (DataByteArray)value;
+		return res;
+	}
+
+	@Override
+	public Result getNext(Double d) throws ExecException {
+		
+		res.returnStatus = POStatus.STATUS_OK;
+		res.result = (Double)value;
+		return res;
+	}
+
+	@Override
+	public Result getNext(Float f) throws ExecException {
+		
+		res.returnStatus = POStatus.STATUS_OK;
+		res.result = (Float)value;
+		return res;
+	}
+
+	@Override
+	public Result getNext(Integer i) throws ExecException {
+		
+		res.returnStatus = POStatus.STATUS_OK;
+		res.result = (Integer)value;
+		return res;
+	}
+
+	@Override
+	public Result getNext(Long l) throws ExecException {
+		
+		res.returnStatus = POStatus.STATUS_OK;
+		res.result = (Long)value;
+		return res;
+	}
+
+	@Override
+	public Result getNext(String s) throws ExecException {
+		
+		res.returnStatus = POStatus.STATUS_OK;
+		res.result = (String)value;
+		return res;
+	}
+
+	@Override
+	public Result getNext(Tuple t) throws ExecException {
+		
+		res.returnStatus = POStatus.STATUS_OK;
+		res.result = (Tuple)value;
+		return res;
+	}
+	
+	
+	
+	@Override
+	public Result getNext(Boolean b) throws ExecException {
+		
+		res.returnStatus = POStatus.STATUS_OK;
+		res.result = (Boolean)value;
+		return res;
+	}
+
+	@Override
+	public Result getNext(Map m) throws ExecException {
+		
+		res.returnStatus = POStatus.STATUS_OK;
+		res.result = (Map)value;
+		return res;
+	}
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ExpressionOperator.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ExpressionOperator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ExpressionOperator.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,33 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators;
+
+
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+
+/**
+ * A base class for all types of expressions. All expression
+ * operators must extend this class.
+ *
+ */
+
+public abstract class ExpressionOperator extends PhysicalOperator<ExprPlanVisitor> {
+	
+	public ExpressionOperator(OperatorKey k) {
+		this(k,-1);
+	}
+
+	public ExpressionOperator(OperatorKey k, int rp) {
+		super(k, rp);
+	}
+	
+	@Override
+	public boolean supportsMultipleOutputs() {
+		return false;
+	}
+	
+	public abstract void visit(ExprPlanVisitor v) throws ParseException;
+	
+	
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,225 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.physicalLayer.util.operatorHelper;
+
+/**
+ * Implements the overloaded form of the project operator.
+ * Projects the specified column from the input tuple.
+ * However, if asked for tuples when the input is a bag,
+ * the overloaded form is invoked and the project streams
+ * the tuples through instead of the bag.
+ */
+public class POProject extends ExpressionOperator {
+	
+	private Log log = LogFactory.getLog(getClass());
+	
+	//The column to project
+	int column = 0;
+	
+	//True if we are in the middle of streaming tuples
+	//in a bag
+	boolean processingBagOfTuples = false;
+	
+	//The bag iterator used while straeming tuple
+	Iterator<Tuple> bagIterator = null;
+	
+	//Temporary tuple
+	Tuple temp = null;
+	
+	public POProject(OperatorKey k) {
+		this(k,-1,0);
+	}
+
+	public POProject(OperatorKey k, int rp) {
+		this(k, rp, 0);
+	}
+	
+	public POProject(OperatorKey k, int rp, int col) {
+		super(k, rp);
+		this.column = col;
+	}
+
+	@Override
+	public String name() {
+		return "Project(" + column + ") - " + mKey.toString();
+	}
+
+	@Override
+	public boolean supportsMultipleInputs() {
+		return false;
+	}
+
+	@Override
+	public boolean supportsMultipleOutputs() {
+		return false;
+	}
+
+	@Override
+	public String typeName() {
+		return getClass().getName();
+	}
+
+	@Override
+	public void visit(ExprPlanVisitor v) throws ParseException {
+		v.visitProject(this);
+	}
+	
+	
+	/**
+	 * Overridden since the attachment of the new input
+	 * should cause the old processing to end.
+	 */
+	@Override
+	public void attachInput(Tuple t) {
+		super.attachInput(t);
+		processingBagOfTuples = false;
+	}
+	
+	/**
+	 * Fetches the input tuple and returns the requested
+	 * column
+	 * @return
+	 * @throws ExecException
+	 */
+	public Result getNext() throws ExecException{
+		Result res = processInput();
+
+		if(res.returnStatus != POStatus.STATUS_OK){
+			if((res.returnStatus == POStatus.STATUS_ERR)){
+				res.returnStatus = POStatus.STATUS_NULL;
+			}
+			return res;
+		}
+		try {
+			res.result = ((Tuple)res.result).get(column);
+		} catch (IOException e) {
+			//Instead of propagating ERR through the pipeline
+			//considering this a nullified operation and returning NULL 
+			//res.returnStatus = POStatus.STATUS_ERR;
+			res.returnStatus = POStatus.STATUS_NULL;
+			log.fatal(e.getMessage());
+		}
+		return res;
+	}
+
+	@Override
+	public Result getNext(DataBag db) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(DataByteArray ba) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Double d) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Float f) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Integer i) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Long l) throws ExecException {
+		return getNext();
+	}
+	
+	
+
+	@Override
+	public Result getNext(Boolean b) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Map m) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(String s) throws ExecException {
+		return getNext();
+	}
+	
+	/**
+	 * Asked for Tuples. Check if the input is a bag.
+	 * If so, stream the tuples in the bag instead of
+	 * the entire bag.
+	 */
+	@Override
+	public Result getNext(Tuple t) throws ExecException {
+		Result res = new Result();
+		if(!processingBagOfTuples){
+			Tuple inpValue = null;
+			res = processInput();
+			if(res.returnStatus!=POStatus.STATUS_OK)
+				return res;
+			inpValue = (Tuple)res.result;
+			res.result = null;
+			
+			try {
+				Object ret = inpValue.get(column);
+				if(ret instanceof DataBag){
+					DataBag retBag = (DataBag)ret;
+					bagIterator = retBag.iterator();
+					if(bagIterator.hasNext()){
+						processingBagOfTuples = true;
+						res.result = bagIterator.next();
+					}
+				}
+				else {
+					res.result = (Tuple)ret;
+				}
+				return res;
+			} catch (IOException e) {
+				res.returnStatus = POStatus.STATUS_ERR;
+				log.error(e.getMessage());
+			}
+		}
+		if(bagIterator.hasNext()){
+			res.result = bagIterator.next();
+			res.returnStatus = POStatus.STATUS_OK;
+			return res;
+		}
+		else{
+			//done processing the bag of tuples
+			processingBagOfTuples = false;
+			return getNext(t);
+		}
+	}
+
+	public int getColumn() {
+		return column;
+	}
+
+	public void setColumn(int column) {
+		this.column = column;
+	}
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/BinaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/BinaryExpressionOperator.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/BinaryExpressionOperator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/BinaryExpressionOperator.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,45 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps;
+
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+
+/**
+ * A base class for all Binary expression operators.
+ * Supports the lhs and rhs operators which are used
+ * to fetch the inputs and apply the appropriate operation
+ * with the appropriate type.
+ *
+ */
+public abstract class BinaryExpressionOperator extends ExpressionOperator {
+	protected ExpressionOperator lhs;
+	protected ExpressionOperator rhs;
+	
+	public BinaryExpressionOperator(OperatorKey k) {
+		this(k,-1);
+	}
+
+	public BinaryExpressionOperator(OperatorKey k, int rp) {
+		super(k, rp);
+	}
+
+	public ExpressionOperator getLhs() {
+		return lhs;
+	}
+	
+	@Override
+	public boolean supportsMultipleInputs() {
+		return true;
+	}
+
+	public void setLhs(ExpressionOperator lhs) {
+		this.lhs = lhs;
+	}
+
+	public ExpressionOperator getRhs() {
+		return rhs;
+	}
+
+	public void setRhs(ExpressionOperator rhs) {
+		this.rhs = rhs;
+	}
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/ComparisonOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/ComparisonOperator.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/ComparisonOperator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/ComparisonOperator.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,42 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators;
+
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.BinaryExpressionOperator;
+
+/**
+ * This is a base class for all comparison operators. Supports the
+ * use of operand type instead of result type as the result type is
+ * always boolean.
+ * 
+ * All comparison operators fetch the lhs and rhs operands and compare
+ * them for each type using different comparison methods based on what
+ * comparison is being implemented.
+ *
+ */
+public abstract class ComparisonOperator extends BinaryExpressionOperator {
+	//The result type for comparison operators is always
+	//Boolean. So the plans evaluating these should consider
+	//the type of the operands instead of the result.
+	//The result will be comunicated using the Status object.
+	//This is a slight abuse of the status object.
+	protected byte operandType;
+	
+	public ComparisonOperator(OperatorKey k) {
+		this(k,-1);
+	}
+
+	public ComparisonOperator(OperatorKey k, int rp) {
+		super(k, rp);
+	}
+
+	public byte getOperandType() {
+		return operandType;
+	}
+
+	public void setOperandType(byte operandType) {
+		this.operandType = operandType;
+	}
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/GreaterThanExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/GreaterThanExpr.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/GreaterThanExpr.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/GreaterThanExpr.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,265 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.backend.executionengine.ExecException;
+
+
+public class GreaterThanExpr extends ComparisonOperator {
+
+	private final Log log = LogFactory.getLog(getClass());
+	
+	public GreaterThanExpr(OperatorKey k) {
+		this(k,-1);
+	}
+
+	public GreaterThanExpr(OperatorKey k, int rp) {
+		super(k, rp);
+	}
+
+	@Override
+	public String name() {
+		return "Greater Than - " + mKey.toString();
+	}
+
+	@Override
+	public String typeName() {
+		return getClass().getName();
+	}
+
+	@Override
+	public void visit(ExprPlanVisitor v) throws ParseException {
+		v.visitGreaterThan(this);
+	}
+	
+	@Override
+	public Result getNext(DataByteArray ba) throws ExecException {
+		byte status;
+		Result res;
+		DataByteArray left=null, right=null;
+		res = lhs.getNext(left);
+		status = res.returnStatus;
+		if(status!=POStatus.STATUS_OK) { 
+			log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+			res.returnStatus=POStatus.STATUS_NULL;
+			return res;
+		}
+		left = (DataByteArray)res.result;
+		
+		
+		
+		res = rhs.getNext(right);
+		status = res.returnStatus;
+		if(status!=POStatus.STATUS_OK) { 
+			log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+			res.returnStatus=POStatus.STATUS_NULL;
+			return res;
+		}
+		right = (DataByteArray)res.result;
+		
+		int ret = left.compareTo(right);
+		if(ret==-1){
+			res.result = new Boolean(true);
+			//left = right = null;
+			return res;
+		}
+		else{
+			res.result = new Boolean(false);
+			//left = right = null;
+			return res;
+		}
+	}
+
+	@Override
+	public Result getNext(Double d) throws ExecException {
+		byte status;
+		Result res;
+		Double left=null, right=null;
+		res = lhs.getNext(left);
+		status = res.returnStatus;
+		if(status!=POStatus.STATUS_OK) { 
+			log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+			res.returnStatus=POStatus.STATUS_NULL;
+			return res;
+		}
+		left = (Double)res.result;
+		
+		
+		
+		res = rhs.getNext(right);
+		status = res.returnStatus;
+		if(status!=POStatus.STATUS_OK) { 
+			log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+			res.returnStatus=POStatus.STATUS_NULL;
+			return res;
+		}
+		right = (Double)res.result;
+		
+		if(left>right){
+			res.result = new Boolean(true);
+			//left = right = null;
+			return res;
+		}
+		else{
+			res.result = new Boolean(false);
+			//left = right = null;
+			return res;
+		}
+	}
+
+	@Override
+	public Result getNext(Float f) throws ExecException {
+		byte status;
+		Result res;
+		Float left=null, right=null;
+		res = lhs.getNext(left);
+		status = res.returnStatus;
+		if(status!=POStatus.STATUS_OK) { 
+			log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+			res.returnStatus=POStatus.STATUS_NULL;
+			return res;
+		}
+		left = (Float)res.result;
+		
+		
+		
+		res = rhs.getNext(right);
+		status = res.returnStatus;
+		if(status!=POStatus.STATUS_OK) { 
+			log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+			res.returnStatus=POStatus.STATUS_NULL;
+			return res;
+		}
+		right = (Float)res.result;
+		
+		if(left>right){
+			res.result = new Boolean(true);
+			//left = right = null;
+			return res;
+		}
+		else{
+			res.result = new Boolean(false);
+			//left = right = null;
+			return res;
+		}
+	}
+
+	@Override
+	public Result getNext(Integer i) throws ExecException {
+		byte status;
+		Result res;
+		Integer left=null, right=null;
+		res = lhs.getNext(left);
+		status = res.returnStatus;
+		if(status!=POStatus.STATUS_OK) { 
+			log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+			res.returnStatus=POStatus.STATUS_NULL;
+			return res;
+		}
+		left = (Integer)res.result;
+		
+		
+		
+		res = rhs.getNext(right);
+		status = res.returnStatus;
+		if(status!=POStatus.STATUS_OK) { 
+			log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+			res.returnStatus=POStatus.STATUS_NULL;
+			return res;
+		}
+		right = (Integer)res.result;
+		
+		if(left>right){
+			res.result = new Boolean(true);
+			//left = right = null;
+			return res;
+		}
+		else{
+			res.result = new Boolean(false);
+			//left = right = null;
+			return res;
+		}
+	}
+
+	@Override
+	public Result getNext(Long l) throws ExecException {
+		byte status;
+		Result res;
+		Long left=null, right=null;
+		res = lhs.getNext(left);
+		status = res.returnStatus;
+		if(status!=POStatus.STATUS_OK) { 
+			log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+			res.returnStatus=POStatus.STATUS_NULL;
+			return res;
+		}
+		left = (Long)res.result;
+		
+		
+		
+		res = rhs.getNext(right);
+		status = res.returnStatus;
+		if(status!=POStatus.STATUS_OK) { 
+			log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+			res.returnStatus=POStatus.STATUS_NULL;
+			return res;
+		}
+		right = (Long)res.result;
+		
+		if(left>right){
+			res.result = new Boolean(true);
+			//left = right = null;
+			return res;
+		}
+		else{
+			res.result = new Boolean(false);
+			//left = right = null;
+			return res;
+		}
+	}
+
+	@Override
+	public Result getNext(String s) throws ExecException {
+		byte status;
+		Result res;
+		String left=null, right=null;
+		
+		res = lhs.getNext(left);
+		status = res.returnStatus;
+		if(status!=POStatus.STATUS_OK) { 
+			log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+			res.returnStatus=POStatus.STATUS_NULL;
+			return res;
+		}
+		left = (String)res.result;
+		
+		
+		
+		res = rhs.getNext(right);
+		status = res.returnStatus;
+		if(status!=POStatus.STATUS_OK) { 
+			log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+			res.returnStatus=POStatus.STATUS_NULL;
+			return res;
+		}
+		right = (String)res.result;
+		
+		int ret = left.compareTo(right);
+		if(ret>0){
+			res.result = new Boolean(true);
+			//left = right = null;
+			return res;
+		}
+		else{
+			res.result = new Boolean(false);
+			//left = right = null;
+			return res;
+		}
+	}
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/util/operatorHelper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/util/operatorHelper.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/util/operatorHelper.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/util/operatorHelper.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,34 @@
+package org.apache.pig.impl.physicalLayer.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.data.DataType;
+
+public class operatorHelper {
+	public static int numTypes(){
+		byte[] types = genAllTypes();
+		return types.length;
+	}
+	public static byte[] genAllTypes(){
+		byte[] types = { DataType.BAG, DataType.BOOLEAN, DataType.BYTEARRAY, DataType.CHARARRAY, 
+				DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.LONG, DataType.MAP, DataType.TUPLE};
+		return types;
+	}
+	
+	private static String[] genAllTypeNames(){
+		String[] names = { "BAG", "BOOLEAN", "BYTEARRAY", "CHARARRAY", "DOUBLE", "FLOAT", "INTEGER", "LONG", 
+				"MAP", "TUPLE" };
+		return names;
+	}
+	
+	public static Map<Byte, String> genTypeToNameMap(){
+		byte[] types = genAllTypes();
+		String[] names = genAllTypeNames();
+		Map<Byte,String> ret = new HashMap<Byte, String>();
+		for(int i=0;i<types.length;i++){
+			ret.put(types[i], names[i]);
+		}
+		return ret;
+	}
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java?rev=643092&r1=643091&r2=643092&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java Mon Mar 31 11:33:08 2008
@@ -18,9 +18,10 @@
 
 package org.apache.pig.impl.plan;
 
+import java.io.Serializable;
+import java.lang.StringBuilder;
 import java.util.ArrayList;
 import java.util.List;
-import java.lang.StringBuilder;
 
 import org.apache.pig.impl.logicalLayer.OperatorKey;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
@@ -28,7 +29,7 @@
 /**
  * Base class for all types of operators.
  */
-abstract public class Operator<V extends PlanVisitor> {
+abstract public class Operator<V extends PlanVisitor> implements Serializable {
     private static final long serialVersionUID = 1L;
 
     /**

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,122 @@
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestConstExpr {
+	Random r = new Random();
+	ConstantExpression ce = (ConstantExpression) GenPhyOp.exprConst();
+	
+	@Before
+	public void setUp() throws Exception {
+	}
+
+	@After
+	public void tearDown() throws Exception {
+	}
+
+	@Test
+	public void testGetNextInteger() throws ExecException {
+		Integer inp = r.nextInt();
+		ce.setValue(inp);
+		Result resi = ce.getNext(inp);
+		Integer ret = (Integer)resi.result;
+		assertEquals(inp, ret);
+	}
+
+	@Test
+	public void testGetNextLong() throws ExecException {
+		Long inp = r.nextLong();
+		ce.setValue(inp);
+		Result resl = ce.getNext(inp);
+		Long ret = (Long)resl.result;
+		assertEquals(inp, ret);
+	}
+
+	@Test
+	public void testGetNextDouble() throws ExecException {
+		Double inp = r.nextDouble();
+		ce.setValue(inp);
+		Result resd = ce.getNext(inp);
+		Double ret = (Double)resd.result;
+		assertEquals(inp, ret);
+	}
+
+	@Test
+	public void testGetNextFloat() throws ExecException {
+		Float inp = r.nextFloat();
+		ce.setValue(inp);
+		Result resf = ce.getNext(inp);
+		Float ret = (Float)resf.result;
+		assertEquals(inp, ret);
+	}
+
+	@Test
+	public void testGetNextString() throws ExecException {
+		String inp = GenRandomData.genRandString(r);
+		ce.setValue(inp);
+		Result ress = ce.getNext(inp);
+		String ret = (String)ress.result;
+		assertEquals(inp, ret);
+	}
+
+	@Test
+	public void testGetNextDataByteArray() throws ExecException {
+		DataByteArray inp = GenRandomData.genRandDBA(r);
+		ce.setValue(inp);
+		Result resba = ce.getNext(inp);
+		DataByteArray ret = (DataByteArray)resba.result;
+		assertEquals(inp, ret);
+	}
+
+	@Test
+	public void testGetNextMap() throws ExecException {
+		Map<Integer,String> inp = GenRandomData.genRandMap(r, 10);
+		ce.setValue(inp);
+		Result resm = ce.getNext(inp);
+		Map<Integer,String> ret = (Map)resm.result;
+		assertEquals(inp, ret);
+	}
+
+	@Test
+	public void testGetNextBoolean() throws ExecException {
+		Boolean inp = r.nextBoolean();
+		ce.setValue(inp);
+		Result res = ce.getNext(inp);
+		Boolean ret = (Boolean)res.result;
+		assertEquals(inp, ret);
+	}
+
+	@Test
+	public void testGetNextTuple() throws ExecException {
+		Tuple inp = GenRandomData.genRandSmallBagTuple(r, 10, 100);
+		ce.setValue(inp);
+		Result rest = ce.getNext(inp);
+		Tuple ret = (Tuple)rest.result;
+		assertEquals(inp, ret);
+	}
+
+	@Test
+	public void testGetNextDataBag() throws ExecException {
+		DataBag inp = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
+		ce.setValue(inp);
+		Result res = ce.getNext(inp);
+		DataBag ret = (DataBag)res.result;
+		assertEquals(inp, ret);
+	}
+
+}

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,45 @@
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFilter {
+	POFilter pass;
+	POFilter fail;
+	Tuple t;
+	
+	@Before
+	public void setUp() throws Exception {
+		pass = GenPhyOp.topFilterOpWithExPlan(50, 25);
+		fail = GenPhyOp.topFilterOpWithExPlan(25, 50);
+		
+		t = GenRandomData.genRandSmallBagTuple(new Random(), 10, 100);
+	}
+
+	@After
+	public void tearDown() throws Exception {
+	}
+
+	@Test
+	public void testGetNextTuple() throws ExecException {
+		pass.attachInput(t);
+		Result res = pass.getNext(t);
+		assertEquals(t, res.result);
+		fail.attachInput(t);
+		res = fail.getNext(t);
+		assertEquals(res.returnStatus, POStatus.STATUS_EOP);
+	}
+
+}

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,68 @@
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GreaterThanExpr;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPhyOp {
+	PhysicalOperator<PhyPlanVisitor> op;
+	PhysicalOperator<PhyPlanVisitor> inpOp;
+	Tuple t;
+
+	@Before
+	public void setUp() throws Exception {
+		op = GenPhyOp.topFilterOp();
+		inpOp = GenPhyOp.topFilterOpWithExPlan(25,10);
+		t = GenRandomData.genRandSmallBagTuple(new Random(), 10, 100);
+	}
+
+	@After
+	public void tearDown() throws Exception {
+	}
+
+	@Test
+	public void testProcessInput() throws ExecException {
+		//Stand-alone tests
+		Result res = op.processInput();
+		assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+		op.attachInput(t);
+		res = op.processInput();
+		assertEquals(POStatus.STATUS_OK, res.returnStatus);
+		assertEquals(t, res.result);
+		op.detachInput();
+		res = op.processInput();
+		assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+		
+		//With input operator
+		List<PhysicalOperator<PhyPlanVisitor>> inp = new ArrayList<PhysicalOperator<PhyPlanVisitor>>();
+		inp.add(inpOp);
+		op.setInputs(inp);
+		op.processInput();
+		assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+		
+		inpOp.attachInput(t);
+		res = op.processInput();
+		assertEquals(POStatus.STATUS_OK, res.returnStatus);
+		assertEquals(t, res.result);
+		inpOp.detachInput();
+		res = op.processInput();
+		assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+	}
+
+}

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,54 @@
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestProject {
+	Tuple typFinder;
+	Random r;
+	
+	Tuple t;
+	Result res;
+	POProject proj;
+	
+	@Before
+	public void setUp() throws Exception {
+		r = new Random();
+		typFinder = GenRandomData.genRandSmallBagTuple(r, 10, 100);
+		t = GenRandomData.genRandSmallBagTuple(r,10,100);
+		res = new Result();
+		proj = GenPhyOp.exprProject();
+	}
+
+	@After
+	public void tearDown() throws Exception {
+	}
+
+	@Test
+	public void testGetNext() throws ExecException, IOException {
+		proj.attachInput(t);
+		for(int j=0;j<t.size();j++){
+			proj.attachInput(t);
+			proj.setColumn(j);
+
+			res = proj.getNext();
+			assertEquals(POStatus.STATUS_OK, res.returnStatus);
+			assertEquals(t.get(j), res.result);
+		}
+	}
+
+}

Added: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,134 @@
+package org.apache.pig.test.utils;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GreaterThanExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.LTOrEqualToExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.LessThanExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.NotEqualToExpr;
+
+public class GenPhyOp {
+	static Random r = new Random();
+	public static ConstantExpression exprConst(){
+		ConstantExpression ret = new ConstantExpression(new OperatorKey("",r.nextLong()));
+		return ret;
+	}
+
+	public static GreaterThanExpr compGreaterThanExpr(){
+		GreaterThanExpr ret = new GreaterThanExpr(new OperatorKey("",r.nextLong()));
+		return ret;
+	}
+	
+	public static POProject exprProject(){
+		POProject ret = new POProject(new OperatorKey("",r.nextLong()));
+		return ret;
+	}
+//	
+//	public static GTOrEqualToExpr compGTOrEqualToExpr(){
+//		GTOrEqualToExpr ret = new GTOrEqualToExpr(new OperatorKey("",r.nextLong()));
+//		return ret;
+//	}
+//	
+//	public static EqualToExpr compEqualToExpr(){
+//		EqualToExpr ret = new EqualToExpr(new OperatorKey("",r.nextLong()));
+//		return ret;
+//	}
+//	
+//	public static NotEqualToExpr compNotEqualToExpr(){
+//		NotEqualToExpr ret = new NotEqualToExpr(new OperatorKey("",r.nextLong()));
+//		return ret;
+//	}
+//	
+//	public static LessThanExpr compLessThanExpr(){
+//		LessThanExpr ret = new LessThanExpr(new OperatorKey("",r.nextLong()));
+//		return ret;
+//	}
+//	
+//	public static LTOrEqualToExpr compLTOrEqualToExpr(){
+//		LTOrEqualToExpr ret = new LTOrEqualToExpr(new OperatorKey("",r.nextLong()));
+//		return ret;
+//	}
+//	
+//	public static POLocalRearrange topLocalRearrangeOp(){
+//		POLocalRearrange ret = new POLocalRearrange(new OperatorKey("",r.nextLong()));
+//		return ret;
+//	}
+//	
+//	public static POGenerate topGenerateOp(){
+//		POGenerate ret = new POGenerate(new OperatorKey("",r.nextLong()));
+//		return ret;
+//	}
+//	
+//	public static POLoad topLoadOp(){
+//		POLoad ret = new POLoad(new OperatorKey("",r.nextLong()));
+//		return ret;
+//	}
+//	
+	public static POFilter topFilterOp(){
+		POFilter ret = new POFilter(new OperatorKey("",r.nextLong()));
+		return ret;
+	}
+	
+	public static POFilter topFilterOpWithExPlan(int lhsVal, int rhsVal) throws IOException{
+		POFilter ret = new POFilter(new OperatorKey("",r.nextLong()));
+		
+		ConstantExpression ce1 = GenPhyOp.exprConst();
+		ce1.setValue(lhsVal);
+		
+		ConstantExpression ce2 = GenPhyOp.exprConst();
+		ce2.setValue(rhsVal);
+		
+		GreaterThanExpr gr = GenPhyOp.compGreaterThanExpr();
+		gr.setLhs(ce1);
+		gr.setRhs(ce2);
+		gr.setOperandType(DataType.INTEGER);
+		
+		ExprPlan ep = new ExprPlan();
+		ep.add(ce1);
+		ep.add(ce2);
+		ep.add(gr);
+		
+		ep.connect(ce1, gr);
+		ep.connect(ce2, gr);
+		
+		ret.setPlan(ep);
+		
+		return ret;
+	}
+//	
+//	public static POGlobalRearrange topGlobalRearrangeOp(){
+//		POGlobalRearrange ret = new POGlobalRearrange(new OperatorKey("",r.nextLong()));
+//		return ret;
+//	}
+//	
+//	public static POPackage topPackageOp(){
+//		POPackage ret = new POPackage(new OperatorKey("",r.nextLong()));
+//		return ret;
+//	}
+//	
+//	public static POStore topStoreOp(){
+//		POStore ret = new POStore(new OperatorKey("",r.nextLong()));
+//		return ret;
+//	}
+//	
+//	public static StartMap topStartMapOp(){
+//		StartMap ret = new StartMap(new OperatorKey("",r.nextLong()));
+//		return ret;
+//	}
+}

Added: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,111 @@
+package org.apache.pig.test.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+
+public class GenRandomData {
+	
+	
+	public static Map<Integer,String> genRandMap(Random r, int numEnt) {
+		Map<Integer,String> ret = new HashMap<Integer, String>();
+		if(r==null){
+			ret.put(1, "RANDOM");
+			return ret;
+		}
+		for(int i=0;i<numEnt;i++){
+			ret.put(r.nextInt(), genRandString(r));
+		}
+		return ret;
+	}
+	
+	public static String genRandString(Random r){
+		if(r==null) return "RANDOM";
+		char[] chars = new char[10];
+		for(int i=0;i<10;i++){
+			chars[i] = (char)(r.nextInt(26)+65);
+		}
+		return new String(chars);
+	}
+	
+	public static DataByteArray genRandDBA(Random r){
+		if(r==null) return new DataByteArray("RANDOM".getBytes());
+		byte[] bytes = new byte[10];
+		r.nextBytes(bytes);
+		return new DataByteArray(bytes);
+	}
+	
+	public static Tuple genRandSmallTuple(Random r, int limit){
+		if(r==null){
+			Tuple t = new DefaultTuple();
+			t.append("RANDOM");
+			return t;
+		}
+		Tuple t = new DefaultTuple();
+		t.append(genRandString(r));
+		t.append(r.nextInt(limit));
+		return t;
+	}
+	
+	public static Tuple genRandSmallTuple(String s, int value){
+		Tuple t = new DefaultTuple();
+		t.append(s);
+		t.append(value);
+		return t;
+	}
+	
+	public static DataBag genRandSmallTupDataBag(Random r, int num, int limit){
+		if(r==null) {
+			DataBag db = DefaultBagFactory.getInstance().newDefaultBag();
+			Tuple t = new DefaultTuple();
+			t.append("RANDOM");
+			db.add(t);
+			return db;
+		}
+		DataBag db = DefaultBagFactory.getInstance().newDefaultBag();
+		for(int i=0;i<num;i++){
+			db.add(genRandSmallTuple(r, limit));
+		}
+		return db;
+	}
+	
+	public static Tuple genRandSmallBagTuple(Random r, int num, int limit){
+		if(r==null){
+			Tuple t = new DefaultTuple();
+			t.append("RANDOM");
+			return t;
+		}
+		Tuple t = new DefaultTuple();
+		t.append(genRandSmallTupDataBag(r, num, limit));
+		t.append(r.nextBoolean());
+		t.append(genRandDBA(r));
+		t.append(genRandString(r));
+		t.append(r.nextDouble());
+		t.append(r.nextFloat());
+		t.append(r.nextInt());
+		t.append(r.nextLong());
+		t.append(genRandMap(r, num));
+		return t;
+	}
+	
+	public static DataBag genRandFullTupDataBag(Random r, int num, int limit){
+		if(r==null) {
+			DataBag db = DefaultBagFactory.getInstance().newDefaultBag();
+			Tuple t = new DefaultTuple();
+			t.append("RANDOM");
+			db.add(t);
+			return db;
+		}
+		DataBag db = DefaultBagFactory.getInstance().newDefaultBag();
+		for(int i=0;i<num;i++){
+			db.add(genRandSmallBagTuple(r, num, limit));
+		}
+		return db;
+	}
+}