You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/03/31 20:33:14 UTC
svn commit: r643092 - in /incubator/pig/branches/types: ./
src/org/apache/pig/impl/physicalLayer/
src/org/apache/pig/impl/physicalLayer/plans/
src/org/apache/pig/impl/physicalLayer/topLevelOperators/
src/org/apache/pig/impl/physicalLayer/topLevelOperat...
Author: gates
Date: Mon Mar 31 11:33:08 2008
New Revision: 643092
URL: http://svn.apache.org/viewvc?rev=643092&view=rev
Log:
Checked in Shravan's first incr1 patch, which contains the first version of a few of the new physical operators.
Added:
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStatus.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlan.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POFilter.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ConstantExpression.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ExpressionOperator.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/BinaryExpressionOperator.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/ComparisonOperator.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/GreaterThanExpr.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/util/
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/util/operatorHelper.java
incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java
incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java
incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/
incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java
Modified:
incubator/pig/branches/types/build.xml
incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java
Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=643092&r1=643091&r2=643092&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Mon Mar 31 11:33:08 2008
@@ -132,7 +132,16 @@
</target>
<target name="compile-sources">
- <javac encoding="${build.encoding}" srcdir="${sources}" includes="**/plan/*.java, **/data/*.java, **/pig/builtin/*.java, **/test/TestOperatorPlan.java, **/test/TestBuiltin.java, **/logicalLayer/*.java, **/logicalLayer/schema/*.java " destdir="${dist}" debug="${javac.debug}" optimize="${javac.optimize}" target="${javac.version}" source="${javac.version}" deprecation="${javac.deprecation}">
+ <javac encoding="${build.encoding}" srcdir="${sources}"
+ includes="**/plan/*.java, **/data/*.java, **/pig/builtin/*.java,
+ **/test/TestOperatorPlan.java, **/test/TestBuiltin.java,
+ **/test/TestConstExpr.java, **/test/TestFilter.java, **/test/TestPhyOp.java,
+ **/test/TestProject.java, **/test/utils/*.java, **/logicalLayer/*.java,
+ **/logicalLayer/schema/*.java, **/physicalLayer/topLevelOperators/*.java,
+ **/physicalLayer/topLevelOperator/**/*.java, **/physicalLayer/plans/*.java,
+ **/physicalLayer/util/*.java, **/physicalLayer/Result.java,
+ **/physicalLayer/POStatus.java"
+ destdir="${dist}" debug="${javac.debug}" optimize="${javac.optimize}" target="${javac.version}" source="${javac.version}" deprecation="${javac.deprecation}">
<compilerarg line="${javac.args} ${javac.args.warnings}" />
<classpath refid="${cp}" />
</javac>
@@ -216,6 +225,10 @@
<fileset dir="test">
<include name="**/TestBuiltin.java" />
<include name="**/TestOperatorPlan.java" />
+ <include name="**/TestPhyOp.java" />
+ <include name="**/TestConstExpr.java" />
+ <include name="**/TestProject.java" />
+ <include name="**/TestFilter.java" />
<!--
<include name="**/*Test*.java" />
<exclude name="**/TestLargeFile.java" />
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStatus.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStatus.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStatus.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStatus.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,10 @@
+package org.apache.pig.impl.physicalLayer;
+
+public class POStatus {
+ public static final byte STATUS_OK = 0;
+ public static final byte STATUS_NULL = 1;
+ public static final byte STATUS_ERR = 2;
+ public static final byte STATUS_EOP = 3; // end of processing
+
+ public static Object result;
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,17 @@
+package org.apache.pig.impl.physicalLayer;
+
+import java.io.Serializable;
+
+public class Result implements Serializable{
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ public byte returnStatus;
+ public Object result;
+
+ public Result(){
+ returnStatus = POStatus.STATUS_ERR;
+ result = null;
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlan.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlan.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlan.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,16 @@
+package org.apache.pig.impl.physicalLayer.plans;
+
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+
+/**
+ * A plan that supports expressions by combining the
+ * expression operators. This will be used by top level
+ * operators like filter and foreach that have Expression Plans
+ * as their attribute plan.
+ *
+ */
+public class ExprPlan extends PhysicalPlan<ExpressionOperator> {
+ public ExprPlan() {
+ super();
+ }
+}
\ No newline at end of file
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,63 @@
+package org.apache.pig.impl.physicalLayer.plans;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GreaterThanExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.LTOrEqualToExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.LessThanExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.NotEqualToExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.NotEqualToExpr;
+
+/**
+ * The visitor to be used for visiting expression plans.
+ * Create the visitor with the ExpressionPlan to be visited.
+ * Call the visit() method for a depth first traversal.
+ *
+ */
+public abstract class ExprPlanVisitor extends PhyPlanVisitor<ExpressionOperator, ExprPlan> {
+
+ private final Log log = LogFactory.getLog(getClass());
+
+ public ExprPlanVisitor(ExprPlan plan) {
+ super(plan);
+ // TODO Auto-generated constructor stub
+ }
+
+ public void visitConstant(ConstantExpression cnst){
+ //do nothing
+ }
+
+ public void visitProject(POProject proj){
+ //do nothing
+ }
+
+ public void visitGreaterThan(GreaterThanExpr grt){
+ //do nothing
+ }
+
+ /*public void visitLessThan(LessThanExpr lt){
+ //do nothing
+ }
+
+ public void visitGTOrEqual(GTOrEqualToExpr gte){
+ //do nothing
+ }
+
+ public void visiLTOrEqual(LTOrEqualToExpr lte){
+ //do nothing
+ }
+
+ public void visitEqualTo(EqualToExpr eq){
+ //do nothing
+ }
+
+ public void visitNotEqualTo(NotEqualToExpr eq){
+ //do nothing
+ }*/
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,67 @@
+package org.apache.pig.impl.physicalLayer.plans;
+
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
+import org.apache.pig.impl.plan.PlanVisitor;
+
+/**
+ * The visitor class for the Physical Plan. To use this,
+ * create the visitor with the plan to be visited. Call
+ * the visit() method to traverse the plan in a depth first
+ * fashion.
+ *
+ * This class can be used to visit only the top level
+ * relational operators. Classes extending this should
+ * use the specific plans to visit the attribute plans
+ * of each of the top level operators.
+ *
+ * @param <O>
+ * @param <P>
+ */
+public abstract class PhyPlanVisitor<O extends PhysicalOperator, P extends PhysicalPlan<O>> extends PlanVisitor<O,P> {
+
+ public PhyPlanVisitor(P plan) {
+ super(plan);
+ }
+
+ @Override
+ public void visit() throws ParseException {
+ depthFirst();
+ }
+
+// public void visitLoad(POLoad ld){
+// //do nothing
+// }
+//
+// public void visitStore(POStore st){
+// //do nothing
+// }
+//
+ public void visitFilter(POFilter fl){
+ //do nothing
+ }
+//
+// public void visitLocalRearrange(POLocalRearrange lr){
+// //do nothing
+// }
+//
+// public void visitGlobalRearrange(POGlobalRearrange gr){
+// //do nothing
+// }
+//
+// public void visitStartMap(StartMap sm){
+// //do nothing
+// }
+//
+// public void visitPackage(POPackage pkg){
+// //do nothing
+// }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,38 @@
+package org.apache.pig.impl.physicalLayer.plans;
+
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.plan.OperatorPlan;
+
+/**
+ *
+ * The base class for all types of physical plans.
+ * This extends the Operator Plan.
+ *
+ * @param <E>
+ */
+public abstract class PhysicalPlan<E extends PhysicalOperator> extends OperatorPlan<E> {
+
+ public PhysicalPlan() {
+ super();
+ }
+
+ public void attachInput(Tuple t){
+ List<E> roots = getRoots();
+ for (E operator : roots)
+ operator.attachInput(t);
+ }
+
+ /**
+ * Write a visual representation of the Physical Plan
+ * into the given output stream
+ * @param out : OutputStream to which the visual representation is written
+ */
+ public void explain(OutputStream out){
+ //Use a plan visitor and output the current physical
+ //plan into out
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POFilter.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POFilter.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POFilter.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,162 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.ComparisonOperator;
+
+
+/**
+ * This is an implementation of the Filter operator. It has
+ * an Expression Plan that decides whether the input tuple
+ * should be filtered or passed through. To avoid many function
+ * calls, the filter operator, stores the Comparison Operator
+ * that is the root of the Expression Plan and uses its getNext
+ * directly.
+ *
+ * Since the filter is supposed to return tuples only, getNext
+ * is not supported on any other data type.
+ *
+ */
+public class POFilter extends PhysicalOperator<PhyPlanVisitor> {
+
+ private Log log = LogFactory.getLog(getClass());
+
+ //The expression plan
+ ExprPlan plan;
+
+ //The root comparison operator of the expression plan
+ ComparisonOperator comOp;
+
+ //The operand type for the comparison operator needed
+ //to call the comparison operators getNext with the
+ //appropriate type
+ byte compOperandType;
+
+ //Dummy types used to access the getNext of appropriate
+ //type
+ DataByteArray ba;
+ String s;
+ Double d;
+ Float f;
+ Integer i;
+ Long l;
+
+
+ public POFilter(OperatorKey k) {
+ this(k, -1, null);
+ }
+
+ public POFilter(OperatorKey k, int rp) {
+ this(k, rp, null);
+ }
+
+ public POFilter(OperatorKey k, List<PhysicalOperator<PhyPlanVisitor>> inputs){
+ this(k,-1,inputs);
+ }
+
+ public POFilter(OperatorKey k, int rp, List<PhysicalOperator<PhyPlanVisitor>> inputs) {
+ super(k, rp, inputs);
+ }
+
+ /**
+ * Attaches the proccesed input tuple to the expression plan
+ * and checks if comparison operator returns a true. If so the
+ * tuple is not filtered and let to pass through. Else, further
+ * input is processed till a tuple that can be passed through is
+ * found or EOP is reached.
+ */
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ Result res = null;
+ Result inp = null;
+ while(true){
+ inp = processInput();
+ if(inp.returnStatus==POStatus.STATUS_EOP)
+ break;
+ if(inp.returnStatus==POStatus.STATUS_NULL || inp.returnStatus==POStatus.STATUS_ERR){
+ log.warn("An erroneous/Null return status was returned by processInput(). Continuing with further input.");
+ continue;
+ }
+
+ plan.attachInput((Tuple)inp.result);
+
+ switch(compOperandType){
+ case DataType.BYTEARRAY:
+ res = comOp.getNext(ba);
+ if(res.returnStatus!=POStatus.STATUS_OK) continue;
+ break;
+ case DataType.CHARARRAY:
+ res = comOp.getNext(s);
+ if(res.returnStatus!=POStatus.STATUS_OK) continue;
+ break;
+ case DataType.DOUBLE:
+ res = comOp.getNext(d);
+ if(res.returnStatus!=POStatus.STATUS_OK) continue;
+ break;
+ case DataType.FLOAT:
+ res = comOp.getNext(f);
+ if(res.returnStatus!=POStatus.STATUS_OK) continue;
+ break;
+ case DataType.INTEGER:
+ res = comOp.getNext(i);
+ if(res.returnStatus!=POStatus.STATUS_OK) continue;
+ break;
+ case DataType.LONG:
+ res = comOp.getNext(l);
+ if(res.returnStatus!=POStatus.STATUS_OK) continue;
+ break;
+ }
+
+ if(res==null){
+ return new Result();
+ }
+ if((Boolean)res.result==true){
+ return inp;
+ }
+ }
+ return inp;
+ }
+
+ @Override
+ public String name() {
+ return "Filter - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public String typeName() {
+ return getClass().getName();
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws ParseException {
+ v.visitFilter(this);
+ }
+
+ public void setPlan(ExprPlan plan) {
+ this.plan = plan;
+ comOp = (ComparisonOperator)(plan.getLeaves()).get(0);
+ compOperandType = comOp.getOperandType();
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,230 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.backend.executionengine.ExecException;
+
+/**
+ *
+ * This is the base class for all operators. This supports a
+ * generic way of processing inputs which can be overridden by
+ * operators extending this class. The input model assumes that
+ * it can either be taken from an operator or can be attached
+ * directly to this operator. Also it is assumed that inputs to an
+ * operator are always in the form of a tuple.
+ *
+ * For this pipeline rework, we assume a pull based model, i.e,
+ * the root operator is going to call getNext with the appropriate
+ * type which initiates a cascade of getNext calls that unroll to
+ * create input for the root operator to work on.
+ *
+ * Any operator that extends the PhysicalOperator, supports a getNext
+ * with all the different types of parameter types. The concrete implementation
+ * should use the result type of its input operator to decide the type of getNext's
+ * parameter. This is done to avoid switch/case based on the type
+ * as much as possible. The default is assumed to return an erroneus Result
+ * corresponding to an unsupported operation on that type. So the operators need
+ * to implement only those types that are supported.
+ *
+ * @param <V>
+ */
+public abstract class PhysicalOperator<V extends PhyPlanVisitor> extends Operator<V> {
+
+ private Log log = LogFactory.getLog(getClass());
+
+ static final long serialVersionUID = 1L;
+
+ //The degree of parallelism requested
+ protected int requestedParallelism;
+
+ //The inputs that this operator will read data from
+ protected List<PhysicalOperator<V>> inputs;
+
+ //The outputs that this operator will write data to
+ //Will be used to create Targeted tuples
+ protected List<PhysicalOperator<V>> outputs;
+
+ //The data type for the results of this operator
+ protected byte resultType = DataType.TUPLE;
+
+ //Specifies if the input has been directly attached
+ protected boolean inputAttached = false;
+
+ //If inputAttached is true, input is set to the input tuple
+ protected Tuple input = null;
+
+ //The result of performing the operation along with the output
+ protected Result res = null;
+
+ public PhysicalOperator(OperatorKey k) {
+ this(k,-1,null);
+ }
+
+ public PhysicalOperator(OperatorKey k, int rp){
+ this(k,rp,null);
+ }
+
+ public PhysicalOperator(OperatorKey k, List<PhysicalOperator<V>> inp){
+ this(k,-1,inp);
+ }
+
+ public PhysicalOperator(OperatorKey k, int rp, List<PhysicalOperator<V>> inp){
+ super(k);
+ requestedParallelism = rp;
+ inputs = inp;
+ res = new Result();
+ }
+
+ public int getRequestedParallelism() {
+ return requestedParallelism;
+ }
+
+ public void setRequestedParallelism(int requestedParallelism) {
+ this.requestedParallelism = requestedParallelism;
+ }
+
+ public byte getResultType() {
+ return resultType;
+ }
+
+ public void setResultType(byte resultType) {
+ this.resultType = resultType;
+ }
+
+ public List<PhysicalOperator<V>> getInputs() {
+ return inputs;
+ }
+
+ public void setInputs(List<PhysicalOperator<V>> inputs) {
+ this.inputs = inputs;
+ }
+
+ public boolean isInputAttached() {
+ return inputAttached;
+ }
+
+ public void setInputAttached(boolean inputAttached) {
+ this.inputAttached = inputAttached;
+ }
+
+ /**
+ * Shorts the input path of this operator by providing
+ * the input tuple directly
+ * @param t - The tuple that should be used as input
+ */
+ public void attachInput(Tuple t){
+ input = t;
+ this.inputAttached = true;
+ }
+
+ /**
+ * Detaches any tuples that are attached
+ *
+ */
+ public void detachInput(){
+ input = null;
+ this.inputAttached = false;
+ }
+
+ /**
+ * A blocking operator should override this to return true.
+ * Blocking operators are those that need the full bag before
+ * operate on the tuples inside the bag. Example is the Global Rearrange.
+ * Non-blocking or pipeline operators are those that work on
+ * a tuple by tuple basis.
+ * @return true if blocking and false otherwise
+ */
+ public boolean isBlocking(){
+ return false;
+ }
+
+ /**
+ * A generic method for parsing input that either returns
+ * the attached input if it exists or fetches it from its
+ * predecessor. If special processing is required, this
+ * method should be overridden.
+ * @return The Result object that results from processing the
+ * input
+ * @throws ExecException
+ */
+ public Result processInput() throws ExecException{
+ Result res = new Result();
+ Tuple inpValue = null;
+ if(input==null && inputs==null) {
+ log.warn("No inputs found. Signaling End of Processing.");
+ res.returnStatus = POStatus.STATUS_EOP;
+ return res;
+ }
+ if(!isInputAttached())
+ return inputs.get(0).getNext(inpValue);
+ else{
+ res.result = input;
+ res.returnStatus = POStatus.STATUS_OK;
+ detachInput();
+ return res;
+ }
+ }
+
+ public abstract void visit(V v) throws ParseException ;
+
+ public Result getNext(Integer i) throws ExecException {
+ return res;
+ }
+
+
+ public Result getNext(Long l) throws ExecException {
+ return res;
+ }
+
+
+ public Result getNext(Double d) throws ExecException {
+ return res;
+ }
+
+
+ public Result getNext(Float f) throws ExecException {
+ return res;
+ }
+
+
+ public Result getNext(String s) throws ExecException {
+ return res;
+ }
+
+
+ public Result getNext(DataByteArray ba) throws ExecException {
+ return res;
+ }
+
+ public Result getNext(Map m) throws ExecException{
+ return res;
+ }
+
+ public Result getNext(Boolean b) throws ExecException{
+ return res;
+ }
+
+
+ public Result getNext(Tuple t) throws ExecException {
+ return res;
+ }
+
+
+ public Result getNext(DataBag db) throws ExecException {
+ return res;
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ConstantExpression.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ConstantExpression.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ConstantExpression.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ConstantExpression.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,155 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.backend.executionengine.ExecException;
+
+
+/**
+ * This class implements a Constant of any type.
+ * Its value can be set using the setValue method.
+ *
+ */
+public class ConstantExpression extends ExpressionOperator {
+
+ private Log log = LogFactory.getLog(getClass());
+
+ //The value that this constant represents
+ Object value;
+
+ //The result of calling getNext
+ Result res = new Result();
+
+ public ConstantExpression(OperatorKey k) {
+ this(k,-1);
+ }
+
+ public ConstantExpression(OperatorKey k, int rp) {
+ super(k, rp);
+ }
+
+ @Override
+ public String name() {
+ return "Constant(" + value.toString() +") - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public String typeName() {
+ return getClass().getName();
+ }
+
+ @Override
+ public void visit(ExprPlanVisitor v) throws ParseException {
+ v.visitConstant(this);
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public void setValue(Object value) {
+ this.value = value;
+ }
+
+ @Override
+ public Result getNext(DataBag db) throws ExecException {
+
+ res.returnStatus = POStatus.STATUS_OK;
+ res.result = (DataBag)value;
+ return res;
+ }
+
+ @Override
+ public Result getNext(DataByteArray ba) throws ExecException {
+
+ res.returnStatus = POStatus.STATUS_OK;
+ res.result = (DataByteArray)value;
+ return res;
+ }
+
+ @Override
+ public Result getNext(Double d) throws ExecException {
+
+ res.returnStatus = POStatus.STATUS_OK;
+ res.result = (Double)value;
+ return res;
+ }
+
+ @Override
+ public Result getNext(Float f) throws ExecException {
+
+ res.returnStatus = POStatus.STATUS_OK;
+ res.result = (Float)value;
+ return res;
+ }
+
+ @Override
+ public Result getNext(Integer i) throws ExecException {
+
+ res.returnStatus = POStatus.STATUS_OK;
+ res.result = (Integer)value;
+ return res;
+ }
+
+ @Override
+ public Result getNext(Long l) throws ExecException {
+
+ res.returnStatus = POStatus.STATUS_OK;
+ res.result = (Long)value;
+ return res;
+ }
+
+ @Override
+ public Result getNext(String s) throws ExecException {
+
+ res.returnStatus = POStatus.STATUS_OK;
+ res.result = (String)value;
+ return res;
+ }
+
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+
+ res.returnStatus = POStatus.STATUS_OK;
+ res.result = (Tuple)value;
+ return res;
+ }
+
+
+
+ @Override
+ public Result getNext(Boolean b) throws ExecException {
+
+ res.returnStatus = POStatus.STATUS_OK;
+ res.result = (Boolean)value;
+ return res;
+ }
+
+ @Override
+ public Result getNext(Map m) throws ExecException {
+
+ res.returnStatus = POStatus.STATUS_OK;
+ res.result = (Map)value;
+ return res;
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ExpressionOperator.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ExpressionOperator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ExpressionOperator.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,33 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators;
+
+
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+
+/**
+ * A base class for all types of expressions. All expression
+ * operators must extend this class.
+ *
+ */
+
+public abstract class ExpressionOperator extends PhysicalOperator<ExprPlanVisitor> {
+
+ public ExpressionOperator(OperatorKey k) {
+ this(k,-1);
+ }
+
+ public ExpressionOperator(OperatorKey k, int rp) {
+ super(k, rp);
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ public abstract void visit(ExprPlanVisitor v) throws ParseException;
+
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,225 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.physicalLayer.util.operatorHelper;
+
+/**
+ * Implements the overloaded form of the project operator.
+ * Projects the specified column from the input tuple.
+ * However, if asked for tuples when the input is a bag,
+ * the overloaded form is invoked and the project streams
+ * the tuples through instead of the bag.
+ */
+public class POProject extends ExpressionOperator {
+
+ private Log log = LogFactory.getLog(getClass());
+
+ //The column to project
+ int column = 0;
+
+ //True if we are in the middle of streaming tuples
+ //in a bag
+ boolean processingBagOfTuples = false;
+
+ //The bag iterator used while straeming tuple
+ Iterator<Tuple> bagIterator = null;
+
+ //Temporary tuple
+ Tuple temp = null;
+
+ public POProject(OperatorKey k) {
+ this(k,-1,0);
+ }
+
+ public POProject(OperatorKey k, int rp) {
+ this(k, rp, 0);
+ }
+
+ public POProject(OperatorKey k, int rp, int col) {
+ super(k, rp);
+ this.column = col;
+ }
+
+ @Override
+ public String name() {
+ return "Project(" + column + ") - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public String typeName() {
+ return getClass().getName();
+ }
+
+ @Override
+ public void visit(ExprPlanVisitor v) throws ParseException {
+ v.visitProject(this);
+ }
+
+
+ /**
+ * Overridden since the attachment of the new input
+ * should cause the old processing to end.
+ */
+ @Override
+ public void attachInput(Tuple t) {
+ super.attachInput(t);
+ processingBagOfTuples = false;
+ }
+
+ /**
+ * Fetches the input tuple and returns the requested
+ * column
+ * @return
+ * @throws ExecException
+ */
+ public Result getNext() throws ExecException{
+ Result res = processInput();
+
+ if(res.returnStatus != POStatus.STATUS_OK){
+ if((res.returnStatus == POStatus.STATUS_ERR)){
+ res.returnStatus = POStatus.STATUS_NULL;
+ }
+ return res;
+ }
+ try {
+ res.result = ((Tuple)res.result).get(column);
+ } catch (IOException e) {
+ //Instead of propagating ERR through the pipeline
+ //considering this a nullified operation and returning NULL
+ //res.returnStatus = POStatus.STATUS_ERR;
+ res.returnStatus = POStatus.STATUS_NULL;
+ log.fatal(e.getMessage());
+ }
+ return res;
+ }
+
+ @Override
+ public Result getNext(DataBag db) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(DataByteArray ba) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Double d) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Float f) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Integer i) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Long l) throws ExecException {
+ return getNext();
+ }
+
+
+
+ @Override
+ public Result getNext(Boolean b) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Map m) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(String s) throws ExecException {
+ return getNext();
+ }
+
+ /**
+ * Asked for Tuples. Check if the input is a bag.
+ * If so, stream the tuples in the bag instead of
+ * the entire bag.
+ */
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ Result res = new Result();
+ if(!processingBagOfTuples){
+ Tuple inpValue = null;
+ res = processInput();
+ if(res.returnStatus!=POStatus.STATUS_OK)
+ return res;
+ inpValue = (Tuple)res.result;
+ res.result = null;
+
+ try {
+ Object ret = inpValue.get(column);
+ if(ret instanceof DataBag){
+ DataBag retBag = (DataBag)ret;
+ bagIterator = retBag.iterator();
+ if(bagIterator.hasNext()){
+ processingBagOfTuples = true;
+ res.result = bagIterator.next();
+ }
+ }
+ else {
+ res.result = (Tuple)ret;
+ }
+ return res;
+ } catch (IOException e) {
+ res.returnStatus = POStatus.STATUS_ERR;
+ log.error(e.getMessage());
+ }
+ }
+ if(bagIterator.hasNext()){
+ res.result = bagIterator.next();
+ res.returnStatus = POStatus.STATUS_OK;
+ return res;
+ }
+ else{
+ //done processing the bag of tuples
+ processingBagOfTuples = false;
+ return getNext(t);
+ }
+ }
+
+ public int getColumn() {
+ return column;
+ }
+
+ public void setColumn(int column) {
+ this.column = column;
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/BinaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/BinaryExpressionOperator.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/BinaryExpressionOperator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/BinaryExpressionOperator.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,45 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps;
+
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+
+/**
+ * A base class for all Binary expression operators.
+ * Supports the lhs and rhs operators which are used
+ * to fetch the inputs and apply the appropriate operation
+ * with the appropriate type.
+ *
+ */
+public abstract class BinaryExpressionOperator extends ExpressionOperator {
+ protected ExpressionOperator lhs;
+ protected ExpressionOperator rhs;
+
+ public BinaryExpressionOperator(OperatorKey k) {
+ this(k,-1);
+ }
+
+ public BinaryExpressionOperator(OperatorKey k, int rp) {
+ super(k, rp);
+ }
+
+ public ExpressionOperator getLhs() {
+ return lhs;
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return true;
+ }
+
+ public void setLhs(ExpressionOperator lhs) {
+ this.lhs = lhs;
+ }
+
+ public ExpressionOperator getRhs() {
+ return rhs;
+ }
+
+ public void setRhs(ExpressionOperator rhs) {
+ this.rhs = rhs;
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/ComparisonOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/ComparisonOperator.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/ComparisonOperator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/ComparisonOperator.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,42 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators;
+
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.BinaryExpressionOperator;
+
+/**
+ * This is a base class for all comparison operators. Supports the
+ * use of operand type instead of result type as the result type is
+ * always boolean.
+ *
+ * All comparison operators fetch the lhs and rhs operands and compare
+ * them for each type using different comparison methods based on what
+ * comparison is being implemented.
+ *
+ */
+public abstract class ComparisonOperator extends BinaryExpressionOperator {
+ //The result type for comparison operators is always
+ //Boolean. So the plans evaluating these should consider
+ //the type of the operands instead of the result.
+ //The result will be comunicated using the Status object.
+ //This is a slight abuse of the status object.
+ protected byte operandType;
+
+ public ComparisonOperator(OperatorKey k) {
+ this(k,-1);
+ }
+
+ public ComparisonOperator(OperatorKey k, int rp) {
+ super(k, rp);
+ }
+
+ public byte getOperandType() {
+ return operandType;
+ }
+
+ public void setOperandType(byte operandType) {
+ this.operandType = operandType;
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/GreaterThanExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/GreaterThanExpr.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/GreaterThanExpr.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/GreaterThanExpr.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,265 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.backend.executionengine.ExecException;
+
+
+public class GreaterThanExpr extends ComparisonOperator {
+
+ private final Log log = LogFactory.getLog(getClass());
+
+ public GreaterThanExpr(OperatorKey k) {
+ this(k,-1);
+ }
+
+ public GreaterThanExpr(OperatorKey k, int rp) {
+ super(k, rp);
+ }
+
+ @Override
+ public String name() {
+ return "Greater Than - " + mKey.toString();
+ }
+
+ @Override
+ public String typeName() {
+ return getClass().getName();
+ }
+
+ @Override
+ public void visit(ExprPlanVisitor v) throws ParseException {
+ v.visitGreaterThan(this);
+ }
+
+ @Override
+ public Result getNext(DataByteArray ba) throws ExecException {
+ byte status;
+ Result res;
+ DataByteArray left=null, right=null;
+ res = lhs.getNext(left);
+ status = res.returnStatus;
+ if(status!=POStatus.STATUS_OK) {
+ log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+ res.returnStatus=POStatus.STATUS_NULL;
+ return res;
+ }
+ left = (DataByteArray)res.result;
+
+
+
+ res = rhs.getNext(right);
+ status = res.returnStatus;
+ if(status!=POStatus.STATUS_OK) {
+ log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+ res.returnStatus=POStatus.STATUS_NULL;
+ return res;
+ }
+ right = (DataByteArray)res.result;
+
+ int ret = left.compareTo(right);
+ if(ret==-1){
+ res.result = new Boolean(true);
+ //left = right = null;
+ return res;
+ }
+ else{
+ res.result = new Boolean(false);
+ //left = right = null;
+ return res;
+ }
+ }
+
+ @Override
+ public Result getNext(Double d) throws ExecException {
+ byte status;
+ Result res;
+ Double left=null, right=null;
+ res = lhs.getNext(left);
+ status = res.returnStatus;
+ if(status!=POStatus.STATUS_OK) {
+ log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+ res.returnStatus=POStatus.STATUS_NULL;
+ return res;
+ }
+ left = (Double)res.result;
+
+
+
+ res = rhs.getNext(right);
+ status = res.returnStatus;
+ if(status!=POStatus.STATUS_OK) {
+ log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+ res.returnStatus=POStatus.STATUS_NULL;
+ return res;
+ }
+ right = (Double)res.result;
+
+ if(left>right){
+ res.result = new Boolean(true);
+ //left = right = null;
+ return res;
+ }
+ else{
+ res.result = new Boolean(false);
+ //left = right = null;
+ return res;
+ }
+ }
+
+ @Override
+ public Result getNext(Float f) throws ExecException {
+ byte status;
+ Result res;
+ Float left=null, right=null;
+ res = lhs.getNext(left);
+ status = res.returnStatus;
+ if(status!=POStatus.STATUS_OK) {
+ log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+ res.returnStatus=POStatus.STATUS_NULL;
+ return res;
+ }
+ left = (Float)res.result;
+
+
+
+ res = rhs.getNext(right);
+ status = res.returnStatus;
+ if(status!=POStatus.STATUS_OK) {
+ log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+ res.returnStatus=POStatus.STATUS_NULL;
+ return res;
+ }
+ right = (Float)res.result;
+
+ if(left>right){
+ res.result = new Boolean(true);
+ //left = right = null;
+ return res;
+ }
+ else{
+ res.result = new Boolean(false);
+ //left = right = null;
+ return res;
+ }
+ }
+
+ @Override
+ public Result getNext(Integer i) throws ExecException {
+ byte status;
+ Result res;
+ Integer left=null, right=null;
+ res = lhs.getNext(left);
+ status = res.returnStatus;
+ if(status!=POStatus.STATUS_OK) {
+ log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+ res.returnStatus=POStatus.STATUS_NULL;
+ return res;
+ }
+ left = (Integer)res.result;
+
+
+
+ res = rhs.getNext(right);
+ status = res.returnStatus;
+ if(status!=POStatus.STATUS_OK) {
+ log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+ res.returnStatus=POStatus.STATUS_NULL;
+ return res;
+ }
+ right = (Integer)res.result;
+
+ if(left>right){
+ res.result = new Boolean(true);
+ //left = right = null;
+ return res;
+ }
+ else{
+ res.result = new Boolean(false);
+ //left = right = null;
+ return res;
+ }
+ }
+
+ @Override
+ public Result getNext(Long l) throws ExecException {
+ byte status;
+ Result res;
+ Long left=null, right=null;
+ res = lhs.getNext(left);
+ status = res.returnStatus;
+ if(status!=POStatus.STATUS_OK) {
+ log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+ res.returnStatus=POStatus.STATUS_NULL;
+ return res;
+ }
+ left = (Long)res.result;
+
+
+
+ res = rhs.getNext(right);
+ status = res.returnStatus;
+ if(status!=POStatus.STATUS_OK) {
+ log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+ res.returnStatus=POStatus.STATUS_NULL;
+ return res;
+ }
+ right = (Long)res.result;
+
+ if(left>right){
+ res.result = new Boolean(true);
+ //left = right = null;
+ return res;
+ }
+ else{
+ res.result = new Boolean(false);
+ //left = right = null;
+ return res;
+ }
+ }
+
+ @Override
+ public Result getNext(String s) throws ExecException {
+ byte status;
+ Result res;
+ String left=null, right=null;
+
+ res = lhs.getNext(left);
+ status = res.returnStatus;
+ if(status!=POStatus.STATUS_OK) {
+ log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+ res.returnStatus=POStatus.STATUS_NULL;
+ return res;
+ }
+ left = (String)res.result;
+
+
+
+ res = rhs.getNext(right);
+ status = res.returnStatus;
+ if(status!=POStatus.STATUS_OK) {
+ log.warn("\t\t Recieved an error or a null or an unexpected end of processing. Hence reporting a NULL back.");
+ res.returnStatus=POStatus.STATUS_NULL;
+ return res;
+ }
+ right = (String)res.result;
+
+ int ret = left.compareTo(right);
+ if(ret>0){
+ res.result = new Boolean(true);
+ //left = right = null;
+ return res;
+ }
+ else{
+ res.result = new Boolean(false);
+ //left = right = null;
+ return res;
+ }
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/util/operatorHelper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/util/operatorHelper.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/util/operatorHelper.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/util/operatorHelper.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,34 @@
+package org.apache.pig.impl.physicalLayer.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.data.DataType;
+
+public class operatorHelper {
+ public static int numTypes(){
+ byte[] types = genAllTypes();
+ return types.length;
+ }
+ public static byte[] genAllTypes(){
+ byte[] types = { DataType.BAG, DataType.BOOLEAN, DataType.BYTEARRAY, DataType.CHARARRAY,
+ DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.LONG, DataType.MAP, DataType.TUPLE};
+ return types;
+ }
+
+ private static String[] genAllTypeNames(){
+ String[] names = { "BAG", "BOOLEAN", "BYTEARRAY", "CHARARRAY", "DOUBLE", "FLOAT", "INTEGER", "LONG",
+ "MAP", "TUPLE" };
+ return names;
+ }
+
+ public static Map<Byte, String> genTypeToNameMap(){
+ byte[] types = genAllTypes();
+ String[] names = genAllTypeNames();
+ Map<Byte,String> ret = new HashMap<Byte, String>();
+ for(int i=0;i<types.length;i++){
+ ret.put(types[i], names[i]);
+ }
+ return ret;
+ }
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java?rev=643092&r1=643091&r2=643092&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java Mon Mar 31 11:33:08 2008
@@ -18,9 +18,10 @@
package org.apache.pig.impl.plan;
+import java.io.Serializable;
+import java.lang.StringBuilder;
import java.util.ArrayList;
import java.util.List;
-import java.lang.StringBuilder;
import org.apache.pig.impl.logicalLayer.OperatorKey;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
@@ -28,7 +29,7 @@
/**
* Base class for all types of operators.
*/
-abstract public class Operator<V extends PlanVisitor> {
+abstract public class Operator<V extends PlanVisitor> implements Serializable {
private static final long serialVersionUID = 1L;
/**
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,122 @@
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestConstExpr {
+ Random r = new Random();
+ ConstantExpression ce = (ConstantExpression) GenPhyOp.exprConst();
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testGetNextInteger() throws ExecException {
+ Integer inp = r.nextInt();
+ ce.setValue(inp);
+ Result resi = ce.getNext(inp);
+ Integer ret = (Integer)resi.result;
+ assertEquals(inp, ret);
+ }
+
+ @Test
+ public void testGetNextLong() throws ExecException {
+ Long inp = r.nextLong();
+ ce.setValue(inp);
+ Result resl = ce.getNext(inp);
+ Long ret = (Long)resl.result;
+ assertEquals(inp, ret);
+ }
+
+ @Test
+ public void testGetNextDouble() throws ExecException {
+ Double inp = r.nextDouble();
+ ce.setValue(inp);
+ Result resd = ce.getNext(inp);
+ Double ret = (Double)resd.result;
+ assertEquals(inp, ret);
+ }
+
+ @Test
+ public void testGetNextFloat() throws ExecException {
+ Float inp = r.nextFloat();
+ ce.setValue(inp);
+ Result resf = ce.getNext(inp);
+ Float ret = (Float)resf.result;
+ assertEquals(inp, ret);
+ }
+
+ @Test
+ public void testGetNextString() throws ExecException {
+ String inp = GenRandomData.genRandString(r);
+ ce.setValue(inp);
+ Result ress = ce.getNext(inp);
+ String ret = (String)ress.result;
+ assertEquals(inp, ret);
+ }
+
+ @Test
+ public void testGetNextDataByteArray() throws ExecException {
+ DataByteArray inp = GenRandomData.genRandDBA(r);
+ ce.setValue(inp);
+ Result resba = ce.getNext(inp);
+ DataByteArray ret = (DataByteArray)resba.result;
+ assertEquals(inp, ret);
+ }
+
+ @Test
+ public void testGetNextMap() throws ExecException {
+ Map<Integer,String> inp = GenRandomData.genRandMap(r, 10);
+ ce.setValue(inp);
+ Result resm = ce.getNext(inp);
+ Map<Integer,String> ret = (Map)resm.result;
+ assertEquals(inp, ret);
+ }
+
+ @Test
+ public void testGetNextBoolean() throws ExecException {
+ Boolean inp = r.nextBoolean();
+ ce.setValue(inp);
+ Result res = ce.getNext(inp);
+ Boolean ret = (Boolean)res.result;
+ assertEquals(inp, ret);
+ }
+
+ @Test
+ public void testGetNextTuple() throws ExecException {
+ Tuple inp = GenRandomData.genRandSmallBagTuple(r, 10, 100);
+ ce.setValue(inp);
+ Result rest = ce.getNext(inp);
+ Tuple ret = (Tuple)rest.result;
+ assertEquals(inp, ret);
+ }
+
+ @Test
+ public void testGetNextDataBag() throws ExecException {
+ DataBag inp = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
+ ce.setValue(inp);
+ Result res = ce.getNext(inp);
+ DataBag ret = (DataBag)res.result;
+ assertEquals(inp, ret);
+ }
+
+}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,45 @@
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFilter {
+ POFilter pass;
+ POFilter fail;
+ Tuple t;
+
+ @Before
+ public void setUp() throws Exception {
+ pass = GenPhyOp.topFilterOpWithExPlan(50, 25);
+ fail = GenPhyOp.topFilterOpWithExPlan(25, 50);
+
+ t = GenRandomData.genRandSmallBagTuple(new Random(), 10, 100);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testGetNextTuple() throws ExecException {
+ pass.attachInput(t);
+ Result res = pass.getNext(t);
+ assertEquals(t, res.result);
+ fail.attachInput(t);
+ res = fail.getNext(t);
+ assertEquals(res.returnStatus, POStatus.STATUS_EOP);
+ }
+
+}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,68 @@
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GreaterThanExpr;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPhyOp {
+ PhysicalOperator<PhyPlanVisitor> op;
+ PhysicalOperator<PhyPlanVisitor> inpOp;
+ Tuple t;
+
+ @Before
+ public void setUp() throws Exception {
+ op = GenPhyOp.topFilterOp();
+ inpOp = GenPhyOp.topFilterOpWithExPlan(25,10);
+ t = GenRandomData.genRandSmallBagTuple(new Random(), 10, 100);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testProcessInput() throws ExecException {
+ //Stand-alone tests
+ Result res = op.processInput();
+ assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+ op.attachInput(t);
+ res = op.processInput();
+ assertEquals(POStatus.STATUS_OK, res.returnStatus);
+ assertEquals(t, res.result);
+ op.detachInput();
+ res = op.processInput();
+ assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+
+ //With input operator
+ List<PhysicalOperator<PhyPlanVisitor>> inp = new ArrayList<PhysicalOperator<PhyPlanVisitor>>();
+ inp.add(inpOp);
+ op.setInputs(inp);
+ op.processInput();
+ assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+
+ inpOp.attachInput(t);
+ res = op.processInput();
+ assertEquals(POStatus.STATUS_OK, res.returnStatus);
+ assertEquals(t, res.result);
+ inpOp.detachInput();
+ res = op.processInput();
+ assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+ }
+
+}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,54 @@
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestProject {
+ Tuple typFinder;
+ Random r;
+
+ Tuple t;
+ Result res;
+ POProject proj;
+
+ @Before
+ public void setUp() throws Exception {
+ r = new Random();
+ typFinder = GenRandomData.genRandSmallBagTuple(r, 10, 100);
+ t = GenRandomData.genRandSmallBagTuple(r,10,100);
+ res = new Result();
+ proj = GenPhyOp.exprProject();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testGetNext() throws ExecException, IOException {
+ proj.attachInput(t);
+ for(int j=0;j<t.size();j++){
+ proj.attachInput(t);
+ proj.setColumn(j);
+
+ res = proj.getNext();
+ assertEquals(POStatus.STATUS_OK, res.returnStatus);
+ assertEquals(t.get(j), res.result);
+ }
+ }
+
+}
Added: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,134 @@
+package org.apache.pig.test.utils;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GreaterThanExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.LTOrEqualToExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.LessThanExpr;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.NotEqualToExpr;
+
+public class GenPhyOp {
+ static Random r = new Random();
+ public static ConstantExpression exprConst(){
+ ConstantExpression ret = new ConstantExpression(new OperatorKey("",r.nextLong()));
+ return ret;
+ }
+
+ public static GreaterThanExpr compGreaterThanExpr(){
+ GreaterThanExpr ret = new GreaterThanExpr(new OperatorKey("",r.nextLong()));
+ return ret;
+ }
+
+ public static POProject exprProject(){
+ POProject ret = new POProject(new OperatorKey("",r.nextLong()));
+ return ret;
+ }
+//
+// public static GTOrEqualToExpr compGTOrEqualToExpr(){
+// GTOrEqualToExpr ret = new GTOrEqualToExpr(new OperatorKey("",r.nextLong()));
+// return ret;
+// }
+//
+// public static EqualToExpr compEqualToExpr(){
+// EqualToExpr ret = new EqualToExpr(new OperatorKey("",r.nextLong()));
+// return ret;
+// }
+//
+// public static NotEqualToExpr compNotEqualToExpr(){
+// NotEqualToExpr ret = new NotEqualToExpr(new OperatorKey("",r.nextLong()));
+// return ret;
+// }
+//
+// public static LessThanExpr compLessThanExpr(){
+// LessThanExpr ret = new LessThanExpr(new OperatorKey("",r.nextLong()));
+// return ret;
+// }
+//
+// public static LTOrEqualToExpr compLTOrEqualToExpr(){
+// LTOrEqualToExpr ret = new LTOrEqualToExpr(new OperatorKey("",r.nextLong()));
+// return ret;
+// }
+//
+// public static POLocalRearrange topLocalRearrangeOp(){
+// POLocalRearrange ret = new POLocalRearrange(new OperatorKey("",r.nextLong()));
+// return ret;
+// }
+//
+// public static POGenerate topGenerateOp(){
+// POGenerate ret = new POGenerate(new OperatorKey("",r.nextLong()));
+// return ret;
+// }
+//
+// public static POLoad topLoadOp(){
+// POLoad ret = new POLoad(new OperatorKey("",r.nextLong()));
+// return ret;
+// }
+//
+ public static POFilter topFilterOp(){
+ POFilter ret = new POFilter(new OperatorKey("",r.nextLong()));
+ return ret;
+ }
+
+ public static POFilter topFilterOpWithExPlan(int lhsVal, int rhsVal) throws IOException{
+ POFilter ret = new POFilter(new OperatorKey("",r.nextLong()));
+
+ ConstantExpression ce1 = GenPhyOp.exprConst();
+ ce1.setValue(lhsVal);
+
+ ConstantExpression ce2 = GenPhyOp.exprConst();
+ ce2.setValue(rhsVal);
+
+ GreaterThanExpr gr = GenPhyOp.compGreaterThanExpr();
+ gr.setLhs(ce1);
+ gr.setRhs(ce2);
+ gr.setOperandType(DataType.INTEGER);
+
+ ExprPlan ep = new ExprPlan();
+ ep.add(ce1);
+ ep.add(ce2);
+ ep.add(gr);
+
+ ep.connect(ce1, gr);
+ ep.connect(ce2, gr);
+
+ ret.setPlan(ep);
+
+ return ret;
+ }
+//
+// public static POGlobalRearrange topGlobalRearrangeOp(){
+// POGlobalRearrange ret = new POGlobalRearrange(new OperatorKey("",r.nextLong()));
+// return ret;
+// }
+//
+// public static POPackage topPackageOp(){
+// POPackage ret = new POPackage(new OperatorKey("",r.nextLong()));
+// return ret;
+// }
+//
+// public static POStore topStoreOp(){
+// POStore ret = new POStore(new OperatorKey("",r.nextLong()));
+// return ret;
+// }
+//
+// public static StartMap topStartMapOp(){
+// StartMap ret = new StartMap(new OperatorKey("",r.nextLong()));
+// return ret;
+// }
+}
Added: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java?rev=643092&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java Mon Mar 31 11:33:08 2008
@@ -0,0 +1,111 @@
+package org.apache.pig.test.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+
+public class GenRandomData {
+
+
+ public static Map<Integer,String> genRandMap(Random r, int numEnt) {
+ Map<Integer,String> ret = new HashMap<Integer, String>();
+ if(r==null){
+ ret.put(1, "RANDOM");
+ return ret;
+ }
+ for(int i=0;i<numEnt;i++){
+ ret.put(r.nextInt(), genRandString(r));
+ }
+ return ret;
+ }
+
+ public static String genRandString(Random r){
+ if(r==null) return "RANDOM";
+ char[] chars = new char[10];
+ for(int i=0;i<10;i++){
+ chars[i] = (char)(r.nextInt(26)+65);
+ }
+ return new String(chars);
+ }
+
+ public static DataByteArray genRandDBA(Random r){
+ if(r==null) return new DataByteArray("RANDOM".getBytes());
+ byte[] bytes = new byte[10];
+ r.nextBytes(bytes);
+ return new DataByteArray(bytes);
+ }
+
+ public static Tuple genRandSmallTuple(Random r, int limit){
+ if(r==null){
+ Tuple t = new DefaultTuple();
+ t.append("RANDOM");
+ return t;
+ }
+ Tuple t = new DefaultTuple();
+ t.append(genRandString(r));
+ t.append(r.nextInt(limit));
+ return t;
+ }
+
+ public static Tuple genRandSmallTuple(String s, int value){
+ Tuple t = new DefaultTuple();
+ t.append(s);
+ t.append(value);
+ return t;
+ }
+
+ public static DataBag genRandSmallTupDataBag(Random r, int num, int limit){
+ if(r==null) {
+ DataBag db = DefaultBagFactory.getInstance().newDefaultBag();
+ Tuple t = new DefaultTuple();
+ t.append("RANDOM");
+ db.add(t);
+ return db;
+ }
+ DataBag db = DefaultBagFactory.getInstance().newDefaultBag();
+ for(int i=0;i<num;i++){
+ db.add(genRandSmallTuple(r, limit));
+ }
+ return db;
+ }
+
+ public static Tuple genRandSmallBagTuple(Random r, int num, int limit){
+ if(r==null){
+ Tuple t = new DefaultTuple();
+ t.append("RANDOM");
+ return t;
+ }
+ Tuple t = new DefaultTuple();
+ t.append(genRandSmallTupDataBag(r, num, limit));
+ t.append(r.nextBoolean());
+ t.append(genRandDBA(r));
+ t.append(genRandString(r));
+ t.append(r.nextDouble());
+ t.append(r.nextFloat());
+ t.append(r.nextInt());
+ t.append(r.nextLong());
+ t.append(genRandMap(r, num));
+ return t;
+ }
+
+ public static DataBag genRandFullTupDataBag(Random r, int num, int limit){
+ if(r==null) {
+ DataBag db = DefaultBagFactory.getInstance().newDefaultBag();
+ Tuple t = new DefaultTuple();
+ t.append("RANDOM");
+ db.add(t);
+ return db;
+ }
+ DataBag db = DefaultBagFactory.getInstance().newDefaultBag();
+ for(int i=0;i<num;i++){
+ db.add(genRandSmallBagTuple(r, num, limit));
+ }
+ return db;
+ }
+}