You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/04/17 23:39:30 UTC

svn commit: r649290 - in /incubator/pig/branches/types: ./ src/org/apache/pig/impl/physicalLayer/plans/ src/org/apache/pig/impl/physicalLayer/topLevelOperators/ test/org/apache/pig/test/ test/org/apache/pig/test/utils/

Author: gates
Date: Thu Apr 17 14:39:26 2008
New Revision: 649290

URL: http://svn.apache.org/viewvc?rev=649290&view=rev
Log:
incr4 patch from Shravan.  Adds POForeach, POLocalRearrange, POPackage and makes suggested changes to POStore and POLoad.


Added:
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java
Modified:
    incubator/pig/branches/types/build.xml
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java

Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Thu Apr 17 14:39:26 2008
@@ -141,7 +141,8 @@
         	    **/test/TestGTOrEqual.java,**/test/TestLessThan.java,**/test/TestLTOrEqual.java,
         	    **/test/TestEqualTo.java,**/test/TestNotEqualTo.java, **/test/TestPOGenerate.java,
                 **/test/TestProject.java, **/test/TestLoad.java, **/test/TestStore.java,
-                 **/test/FakeFSOutputStream.java,
+                 **/test/FakeFSOutputStream.java, **/test/TestPackage.java, **/test/TestForEach.java,
+        		**/test/TestLocalRearrange.java,
                 **/test/FakeFSInputStream.java, **/test/Util.java,
                 **/logicalLayer/*.java,
                 **/logicalLayer/schema/*.java, **/physicalLayer/topLevelOperators/*.java,
@@ -249,6 +250,9 @@
                 	<include name="**/TestPOGenerate.java" />
                 	<include name="**/TestLoad.java" />
                 	<include name="**/TestStore.java" />
+                	<include name="**/TestPackage.java" />  
+                	<include name="**/TestLocalRearrange.java" /> 
+                	<include name="**/TestForEach.java" /> 
                     <!--
                     <include name="**/*Test*.java" />
                     <exclude name="**/TestLargeFile.java" />

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java Thu Apr 17 14:39:26 2008
@@ -22,12 +22,14 @@
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
+import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.PlanVisitor;
 
 /**
@@ -44,7 +46,7 @@
  * @param <O>
  * @param <P>
  */
-public abstract class PhyPlanVisitor<O extends PhysicalOperator, P extends PhysicalPlan<O>> extends PlanVisitor<O,P> {
+public class PhyPlanVisitor<O extends PhysicalOperator, P extends PhysicalPlan<O>> extends PlanVisitor<O,P> {
 
     public PhyPlanVisitor(P plan) {
         super(plan);
@@ -67,11 +69,17 @@
         ExprPlanVisitor epv = new ExprPlanVisitor(fl.getPlan());
         epv.visit();
     }
-//    
-//    public void visitLocalRearrange(POLocalRearrange lr){
-//        //do nothing
-//    }
-//    
+    
+    public void visitLocalRearrange(POLocalRearrange lr) throws ParseException{
+        PhyPlanVisitor<PhysicalOperator, PhysicalPlan<PhysicalOperator>> ppv = new PhyPlanVisitor<PhysicalOperator, PhysicalPlan<PhysicalOperator>>(lr.getPlan());
+        ppv.visit();
+    }
+    
+    public void visitForEach(POForEach fe) throws ParseException{
+        PhyPlanVisitor<PhysicalOperator, PhysicalPlan<PhysicalOperator>> ppv = new PhyPlanVisitor<PhysicalOperator, PhysicalPlan<PhysicalOperator>>(fe.getPlan());
+        ppv.visit();
+    }
+    
 //    public void visitGlobalRearrange(POGlobalRearrange gr){
 //        //do nothing
 //    }
@@ -80,9 +88,9 @@
 //        //do nothing
 //    }
 //    
-//    public void visitPackage(POPackage pkg){
-//        //do nothing
-//    }
+    public void visitPackage(POPackage pkg){
+        //do nothing
+    }
     
     public void visitGenerate(POGenerate pogen) {
         //do nothing

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java Thu Apr 17 14:39:26 2008
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.impl.physicalLayer.plans;
 
+import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
 
@@ -31,7 +32,7 @@
  *
  * @param <E>
  */
-public abstract class PhysicalPlan<E extends PhysicalOperator> extends OperatorPlan<E> {
+public class PhysicalPlan<E extends PhysicalOperator> extends OperatorPlan<E> {
 
     public PhysicalPlan() {
         super();
@@ -51,5 +52,12 @@
     public void explain(OutputStream out){
         //Use a plan visitor and output the current physical
         //plan into out
+    }
+    
+    @Override
+    public void connect(E from, E to)
+            throws IOException {
+        super.connect(from, to);
+        to.setInputs(getPredecessors(to));
     }
 }

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java?rev=649290&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java Thu Apr 17 14:39:26 2008
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+
+/**
+ * The foreach operator 
+ * It has an embedded physical plan that
+ * generates tuples as per the specification.
+ */
+public class POForEach extends PhysicalOperator<PhyPlanVisitor> {
+
+    private Log log = LogFactory.getLog(getClass());
+
+    PhysicalPlan<PhysicalOperator> plan;
+
+    POGenerate gen;
+    
+    //Since the plan has a generate, this needs to be maintained
+    //as the generate can potentially return multiple tuples for
+    //same call.
+    private boolean processingPlan = false;
+
+    public POForEach(OperatorKey k) {
+        this(k, -1, null);
+    }
+
+    public POForEach(OperatorKey k, int rp) {
+        this(k, rp, null);
+    }
+
+    public POForEach(OperatorKey k, List<PhysicalOperator> inp) {
+        this(k, -1, inp);
+    }
+
+    public POForEach(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws ParseException {
+        v.visitForEach(this);
+    }
+
+    @Override
+    public String name() {
+        return "For Each - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+    
+    /**
+     * Overridden since the attachment of the new input should cause the old
+     * processing to end.
+     */
+    @Override
+    public void attachInput(Tuple t) {
+        super.attachInput(t);
+        processingPlan = false;
+    }
+    
+    /**
+     * Calls getNext on the generate operator inside the nested
+     * physical plan and returns it maintaining an additional state
+     * to denote the begin and end of the nested plan processing.
+     */
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        Result res = null;
+        Result inp = null;
+        //The nested plan is under processing
+        //So return tuples that the generate oper
+        //returns
+        if(processingPlan){
+            while(true) {
+                res = gen.getNext(t);
+                if(res.returnStatus==POStatus.STATUS_OK){
+                    return res;
+                }
+                if(res.returnStatus==POStatus.STATUS_ERR)
+                    return res;
+                if(res.returnStatus==POStatus.STATUS_NULL)
+                    continue;
+                if(res.returnStatus==POStatus.STATUS_EOP){
+                    processingPlan = false;
+                    break;
+                }
+            }
+        }
+        //The nested plan processing is done or is
+        //yet to begin. So process the input and start
+        //nested plan processing on the input tuple
+        //read
+        while (true) {
+            inp = processInput();
+            if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+                break;
+            if (inp.returnStatus == POStatus.STATUS_NULL)
+                continue;
+            
+            plan.attachInput((Tuple) inp.result);
+            
+            res = gen.getNext(t);
+            if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+                break;
+            if(inp.returnStatus == POStatus.STATUS_NULL)
+                continue;
+            
+            processingPlan = true;
+            
+            return res;
+        }
+        return inp;
+    }
+
+    public PhysicalPlan<PhysicalOperator> getPlan() {
+        return plan;
+    }
+
+    public void setPlan(PhysicalPlan<PhysicalOperator> plan) {
+        this.plan = plan;
+        gen = (POGenerate) plan.getLeaves().get(0);
+    }
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java Thu Apr 17 14:39:26 2008
@@ -94,6 +94,7 @@
      */
     private void tearDown() throws IOException{
         is.close();
+        setUpDone = false;
     }
     
     /**
@@ -128,7 +129,6 @@
                 res.returnStatus = POStatus.STATUS_OK;
         } catch (IOException e) {
             log.error("Received error from loader function: " + e);
-            res.returnStatus = POStatus.STATUS_ERR; 
             return res;
         }
         return res;

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java?rev=649290&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java Thu Apr 17 14:39:26 2008
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+
+/**
+ * The local rearrange operator is a part of the co-group
+ * implementation. It has an embedded physical plan that
+ * generates tuples of the form (grpKey,(indxed inp Tuple)).
+ *
+ */
+public class POLocalRearrange extends PhysicalOperator<PhyPlanVisitor> {
+
+    private Log log = LogFactory.getLog(getClass());
+
+    PhysicalPlan<PhysicalOperator> plan;
+
+    // The position of this LR in the package operator
+    int index;
+
+    POGenerate gen;
+    
+    //Since the plan has a generate, this needs to be maintained
+    //as the generate can potentially return multiple tuples for
+    //same call.
+    private boolean processingPlan = false;
+
+    public POLocalRearrange(OperatorKey k) {
+        this(k, -1, null);
+    }
+
+    public POLocalRearrange(OperatorKey k, int rp) {
+        this(k, rp, null);
+    }
+
+    public POLocalRearrange(OperatorKey k, List<PhysicalOperator> inp) {
+        this(k, -1, inp);
+    }
+
+    public POLocalRearrange(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+        index = -1;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws ParseException {
+        v.visitLocalRearrange(this);
+    }
+
+    @Override
+    public String name() {
+        return "Local Rearrange - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    public int getIndex() {
+        return index;
+    }
+
+    public void setIndex(int index) {
+        this.index = index;
+    }
+    
+    /**
+     * Overridden since the attachment of the new input should cause the old
+     * processing to end.
+     */
+    @Override
+    public void attachInput(Tuple t) {
+        super.attachInput(t);
+        processingPlan = false;
+    }
+    
+    /**
+     * Calls getNext on the generate operator inside the nested
+     * physical plan. Converts the generated tuple into the proper
+     * format, i.e, (key,{(value)})
+     */
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        Result res = null;
+        Result inp = null;
+        //The nested plan is under processing
+        //So return tuples that the generate oper
+        //returns after converting them to the required
+        //format
+        if(processingPlan){
+            while(true) {
+                res = gen.getNext(t);
+                if(res.returnStatus==POStatus.STATUS_OK){
+                    res.result = constructLROutput((Tuple)res.result);
+                    return res;
+                }
+                if(res.returnStatus==POStatus.STATUS_ERR)
+                    return res;
+                if(res.returnStatus==POStatus.STATUS_NULL)
+                    continue;
+                if(res.returnStatus==POStatus.STATUS_EOP){
+                    processingPlan = false;
+                    break;
+                }
+            }
+        }
+        //The nested plan processing is done or is
+        //yet to begin. So process the input and start
+        //nested plan processing on the input tuple
+        //read
+        while (true) {
+            inp = processInput();
+            if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+                break;
+            if (inp.returnStatus == POStatus.STATUS_NULL)
+                continue;
+            
+            plan.attachInput((Tuple) inp.result);
+            
+            res = gen.getNext(t);
+            if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+                break;
+            if(inp.returnStatus == POStatus.STATUS_NULL)
+                continue;
+            
+            processingPlan = true;
+            
+            res.result = constructLROutput((Tuple)res.result);
+            return res;
+        }
+        return inp;
+    }
+    
+    private Tuple constructLROutput(Tuple genOut){
+        //Strip the input tuple off its key which
+        //will be the first field in the tuple
+        Object key = genOut.getAll().remove(0);
+        
+        //Create the indexed tuple out of the value
+        //that is remaining in the input tuple
+        IndexedTuple it = new IndexedTuple(genOut, index);
+        
+        //Put the key and the indexed tuple
+        //in a tuple and return
+        Tuple outPut = new DefaultTuple();
+        outPut.append(key);
+        outPut.append(it);
+        return outPut;
+    }
+
+    public PhysicalPlan<PhysicalOperator> getPlan() {
+        return plan;
+    }
+
+    public void setPlan(PhysicalPlan<PhysicalOperator> plan) {
+        this.plan = plan;
+        gen = (POGenerate) plan.getLeaves().get(0);
+    }
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java?rev=649290&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java Thu Apr 17 14:39:26 2008
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+/**
+ * The package operator that packages
+ * the globally rearranged tuples into
+ * output format as required by co-group.
+ * This is last stage of processing co-group.
+ * This operator has a slightly different
+ * format than other operators in that, it
+ * takes two things as input. The key being 
+ * worked on and the iterator of bags that
+ * contain indexed tuples that just need to
+ * be packaged into their appropriate output
+ * bags based on the index.
+ */
+public class POPackage extends PhysicalOperator<PhyPlanVisitor> {
+    //The iterator of indexed Tuples
+    //that is typically provided by
+    //Hadoop
+    Iterator<IndexedTuple> indTupIter;
+    
+    //The key being worked on
+    Object key;
+
+    //The number of inputs to this
+    //co-group
+    int numInputs;
+    
+    //Denotes if inner is specified
+    //on a particular input
+    boolean[] inner;
+    
+    private final Log log = LogFactory.getLog(getClass());
+
+    public POPackage(OperatorKey k) {
+        this(k, -1, null);
+    }
+
+    public POPackage(OperatorKey k, int rp) {
+        this(k, rp, null);
+    }
+
+    public POPackage(OperatorKey k, List<PhysicalOperator> inp) {
+        this(k, -1, inp);
+    }
+
+    public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+        numInputs = -1;
+    }
+
+    @Override
+    public String name() {
+        return "Package - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws ParseException {
+        v.visitPackage(this);
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+    
+    /**
+     * Attaches the required inputs
+     * @param k - the key being worked on
+     * @param inp - iterator of indexed tuples typically
+     *              obtained from Hadoop
+     */
+    public void attachInput(Object k, Iterator<IndexedTuple> inp) {
+        indTupIter = inp;
+        key = k;
+    }
+
+    /**
+     * attachInput's better half!
+     */
+    public void detachInput() {
+        indTupIter = null;
+        key = null;
+    }
+
+    public int getNumInps() {
+        return numInputs;
+    }
+
+    public void setNumInps(int numInps) {
+        this.numInputs = numInps;
+    }
+    
+    public boolean[] getInner() {
+        return inner;
+    }
+
+    public void setInner(boolean[] inner) {
+        this.inner = inner;
+    }
+
+    /**
+     * From the inputs, constructs the output tuple
+     * for this co-group in the required format which
+     * is (key, {bag of tuples from input 1}, {bag of tuples from input 2}, ...)
+     */
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        if(indTupIter==null){
+            throw new ExecException("Incorrect usage of the Package operator. " +
+                    "No input has been attached.");
+        }
+        
+        //Create numInputs bags
+        DataBag[] dbs = new DataBag[numInputs];
+        for (int i = 0; i < numInputs; i++) {
+            dbs[i] = DefaultBagFactory.getInstance().newDefaultBag();
+        }
+        
+        //For each indexed tup in the inp, sort them
+        //into their corresponding bags based
+        //on the index
+        while (indTupIter.hasNext()) {
+            IndexedTuple it = indTupIter.next();
+            dbs[it.index].add(it.toTuple());
+        }
+        
+        //Construct the output tuple by appending
+        //the key and all the above constructed bags
+        //and return it.
+        Tuple res;
+        try{
+            res = TupleFactory.getInstance().newTuple(numInputs+1);
+            res.set(0,key);
+            int i=-1;
+            for (DataBag bag : dbs) {
+                if(inner[++i]){
+                    if(bag.size()==0){
+                        detachInput();
+                        Result r = new Result();
+                        r.returnStatus = POStatus.STATUS_NULL;
+                        return r;
+                    }
+                }
+                res.set(i+1,bag);
+            }
+        }catch(IOException e){
+            log.error("Received error while constructing the output tuple");
+            return new Result();
+        }
+        detachInput();
+        Result r = new Result();
+        r.result = res;
+        r.returnStatus = POStatus.STATUS_OK;
+        return r;
+    }
+
+
+
+    
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java Thu Apr 17 14:39:26 2008
@@ -21,6 +21,8 @@
 import java.io.OutputStream;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.Tuple;
@@ -51,6 +53,8 @@
     // PigContext passed to us by the operator creator
     PigContext pc;
     
+    private final Log log = LogFactory.getLog(getClass());
+    
     public POStore(OperatorKey k) {
         this(k, -1, null);
     }
@@ -116,6 +120,12 @@
     public Result store() throws ExecException{
         try{
             setUp();
+        }catch (IOException e) {
+            ExecException ee = new ExecException("Unable to setup the storer because of the exception: " + e.getMessage());
+            ee.initCause(e);
+            throw ee;
+        }
+        try{
             Result res;
             Tuple inpValue = null;
             while(true){
@@ -136,7 +146,7 @@
             tearDown();
             return res;
         }catch(IOException e){
-            e.printStackTrace();
+            log.error("Received error from storer function: " + e);
             return new Result();
         }
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java Thu Apr 17 14:39:26 2008
@@ -200,7 +200,7 @@
         Result res = new Result();
         Tuple inpValue = null;
         if (input == null && inputs == null) {
-            log.warn("No inputs found. Signaling End of Processing.");
+//            log.warn("No inputs found. Signaling End of Processing.");
             res.returnStatus = POStatus.STATUS_EOP;
             return res;
         }

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java?rev=649290&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java Thu Apr 17 14:39:26 2008
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests foreach db generate $0
+ * Constructs a bag that projects the input
+ * bag onto the the column $0 and checks if
+ * the tuples generated by foreach are the same
+ * as those in the projected bag 
+ *
+ */
+public class TestForEach {
+    
+    POForEach fe;
+    Tuple t;
+    DataBag db;
+    DataBag projDB;
+    
+    
+    @Before
+    public void setUp() throws Exception {
+        Random r = new Random();
+        db = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
+        projDB = TestHelper.projectBag(db,0);
+        fe = GenPhyOp.topForEachOPWithPlan(0,db.iterator().next());
+        POProject proj = GenPhyOp.exprProject();
+        proj.setColumn(0);
+        proj.setResultType(DataType.TUPLE);
+        proj.setOverloaded(true);
+        Tuple t = new DefaultTuple();
+        t.append(db);
+        proj.attachInput(t);
+        List<PhysicalOperator> inputs = new ArrayList<PhysicalOperator>();
+        inputs.add(proj);
+        fe.setInputs(inputs);
+    }
+
+    
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @Test
+    public void testGetNextTuple() throws ExecException, IOException {
+        int size=0;
+        for(Result res=fe.getNext(t);res.returnStatus!=POStatus.STATUS_EOP;res=fe.getNext(t)){
+            Tuple t = (Tuple)res.result;
+            assertEquals(true, TestHelper.bagContains(projDB, t));
+            ++size;
+        }
+        assertEquals(true, size==db.size());
+    }
+
+}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java Thu Apr 17 14:39:26 2008
@@ -162,10 +162,6 @@
         return true;
     }
 
-    public static void main(String[] args) throws ExecException {
-
-    }
-
     @Test
     public void testOperator() throws ExecException {
         int TRIALS = 10;

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java?rev=649290&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java Thu Apr 17 14:39:26 2008
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests localrearrange db for
+ * group db by $0 
+ *
+ */
+public class TestLocalRearrange {
+    
+    POLocalRearrange lr;
+    Tuple t;
+    DataBag db;
+    
+    
+    @Before
+    public void setUp() throws Exception {
+        Random r = new Random();
+        db = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
+        lr = GenPhyOp.topLocalRearrangeOPWithPlan(0,0,db.iterator().next());
+        POProject proj = GenPhyOp.exprProject();
+        proj.setColumn(0);
+        proj.setResultType(DataType.TUPLE);
+        proj.setOverloaded(true);
+        Tuple t = new DefaultTuple();
+        t.append(db);
+        proj.attachInput(t);
+        List<PhysicalOperator> inputs = new ArrayList<PhysicalOperator>();
+        inputs.add(proj);
+        lr.setInputs(inputs);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @Test
+    public void testGetNextTuple() throws ExecException, IOException {
+        int size=0;
+        for(Result res=lr.getNext(t);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(t)){
+            Tuple t = (Tuple)res.result;
+            IndexedTuple it = (IndexedTuple)t.get(1);
+            //Check if the index is same as input index
+            assertEquals((float)0, (float)it.index, 0.01f);
+            
+            //Check if the input baf contains the value tuple
+            assertEquals(true, TestHelper.bagContains(db, it.toTuple()));
+            
+            //Check if the input key and the output key are same
+            String inpKey = (String)it.toTuple().get(0);
+            assertEquals(true, inpKey.compareTo((String)t.get(0))==0);
+            ++size;
+        }
+        
+        //check if all the tuples in the input are generated
+        assertEquals(true, size==db.size());
+    }
+
+}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java Thu Apr 17 14:39:26 2008
@@ -61,208 +61,208 @@
 
 public class TestPOGenerate extends TestCase {
 
-	DataBag cogroup;
-	DataBag partialFlatten;
-	DataBag simpleGenerate;
-	Random r = new Random();
-	BagFactory bf = BagFactory.getInstance();
-	TupleFactory tf = TupleFactory.getInstance();
-	
-	@Before
-	public void setUp() throws Exception {
-		Tuple [] inputA = new Tuple[4];
-		Tuple [] inputB = new Tuple[4];
-		for(int i = 0; i < 4; i++) {
-			inputA[i] = tf.newTuple(2);
-			inputB[i] = tf.newTuple(1);
-		}
-		inputA[0].set(0, 'a');
-		inputA[0].set(1, '1');
-		inputA[1].set(0, 'b');
-		inputA[1].set(1, '1');
-		inputA[2].set(0, 'a');
-		inputA[2].set(1, '1');
-		inputA[3].set(0, 'c');
-		inputA[3].set(1, '1');
-		inputB[0].set(0, 'b');
-		inputB[1].set(0, 'b');
-		inputB[2].set(0, 'a');
-		inputB[3].set(0, 'd');
-		DataBag cg11 = bf.newDefaultBag();
-		cg11.add(inputA[0]);
-		cg11.add(inputA[2]);
-		DataBag cg21 = bf.newDefaultBag();
-		cg21.add(inputA[1]);
-		DataBag cg31 = bf.newDefaultBag();
-		cg31.add(inputA[3]);
-		DataBag emptyBag = bf.newDefaultBag();
-		DataBag cg12 = bf.newDefaultBag();
-		cg12.add(inputB[2]);
-		DataBag cg22 = bf.newDefaultBag();
-		cg22.add(inputB[0]);
-		cg22.add(inputB[1]);
-		DataBag cg42 = bf.newDefaultBag();
-		cg42.add(inputB[3]);
-		Tuple [] tIn = new Tuple[4];
-		for(int i = 0; i < 4; ++i) {
-			tIn[i] = tf.newTuple(2);
-		}
-		tIn[0].set(0, cg11);
-		tIn[0].set(1, cg12);
-		tIn[1].set(0, cg21);
-		tIn[1].set(1, cg22);
-		tIn[2].set(0, cg31);
-		tIn[2].set(1, emptyBag);
-		tIn[3].set(0, emptyBag);
-		tIn[3].set(1, cg42);
-		
-		cogroup = bf.newDefaultBag();
-		for(int i = 0; i < 4; ++i) {
-			cogroup.add(tIn[i]);
-		}
-		
-		Tuple[] tPartial = new Tuple[4];
-		for(int i = 0; i < 4; ++i) {
-			tPartial[i] = tf.newTuple(2);
-			tPartial[i].set(0, inputA[i].get(0));
-			tPartial[i].set(1, inputA[i].get(1));
-		}
-		
-		tPartial[0].append(cg12);
-		
-		tPartial[1].append(cg22);
-		
-		tPartial[2].append(cg12);
-		
-		tPartial[3].append(emptyBag);
-		
-		partialFlatten = bf.newDefaultBag();
-		for(int i = 0; i < 4; ++i) {
-			partialFlatten.add(tPartial[i]);
-		}
-		
-		simpleGenerate = bf.newDefaultBag();
-		for(int i = 0; i < 4; ++i) {
-			simpleGenerate.add(inputA[i]);
-		}
-		/*
-		System.out.println("Cogroup : " + cogroup);
-		System.out.println("Partial : " + partialFlatten);
-		System.out.println("Simple : " + simpleGenerate);
-		*/
-	}
-	
-	public void testJoin() throws Exception {
-		ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
-		prj1.setResultType(DataType.BAG);
-		prj2.setResultType(DataType.BAG);
-		List<Boolean> toBeFlattened = new LinkedList<Boolean>();
-		toBeFlattened.add(true);
-		toBeFlattened.add(true);
-		ExprPlan plan1 = new ExprPlan();
-		plan1.add(prj1);
-		ExprPlan plan2 = new ExprPlan();
-		plan2.add(prj2);
-		List<ExprPlan> inputs = new LinkedList<ExprPlan>();
-		inputs.add(plan1); 
-		inputs.add(plan2);
-		PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
-		//DataBag obtained = bf.newDefaultBag();
-		for(Iterator<Tuple> it = cogroup.iterator(); it.hasNext(); ) {
-			Tuple t = it.next();
-			plan1.attachInput(t); 
-			plan2.attachInput(t);
-			Result output = poGen.getNext(t);
-			while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
-				//System.out.println(output.result);
-				Tuple tObtained = (Tuple) output.result;
-				assertTrue(tObtained.get(0).toString().equals(tObtained.get(2).toString()));
-				//obtained.add((Tuple) output.result);
-				output = poGen.getNext(t);
-			}
-		}
-		
-	}
-	
-	public void testPartialJoin() throws Exception {
-		ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
-		prj1.setResultType(DataType.BAG);
-		prj2.setResultType(DataType.BAG);
-		List<Boolean> toBeFlattened = new LinkedList<Boolean>();
-		toBeFlattened.add(true);
-		toBeFlattened.add(false);
-		ExprPlan plan1 = new ExprPlan();
-		plan1.add(prj1);
-		ExprPlan plan2 = new ExprPlan();
-		plan2.add(prj2);
-		List<ExprPlan> inputs = new LinkedList<ExprPlan>();
-		inputs.add(plan1); 
-		inputs.add(plan2);
-		PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
-		
-		//DataBag obtained = bf.newDefaultBag();
-		List<String> obtained = new LinkedList<String>();
-		for(Iterator<Tuple> it = cogroup.iterator(); it.hasNext(); ) {
-			Tuple t = it.next();
-			plan1.attachInput(t); 
-			plan2.attachInput(t);
-			Result output = poGen.getNext(t);
-			while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
-				//System.out.println(output.result);
-				obtained.add(((Tuple) output.result).toString());
-				output = poGen.getNext(t);
-			}
-		}
-		int count = 0;
-		for(Iterator<Tuple> it = partialFlatten.iterator(); it.hasNext(); ) {
-			Tuple t = it.next();
-			assertTrue(obtained.contains(t.toString()));
-			++count;
-		}
-		assertEquals(partialFlatten.size(), count);
-		
-	}
-	
-	public void testSimpleGenerate() throws Exception {
-		ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
-		prj1.setResultType(DataType.BAG);
-		prj2.setResultType(DataType.BAG);
-		List<Boolean> toBeFlattened = new LinkedList<Boolean>();
-		toBeFlattened.add(true);
-		toBeFlattened.add(false);
-		ExprPlan plan1 = new ExprPlan();
-		plan1.add(prj1);
-		ExprPlan plan2 = new ExprPlan();
-		plan2.add(prj2);
-		List<ExprPlan> inputs = new LinkedList<ExprPlan>();
-		inputs.add(plan1); 
-		inputs.add(plan2);
-		PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
-		
-		//DataBag obtained = bf.newDefaultBag();
-		List<String> obtained = new LinkedList<String>();
-		for(Iterator<Tuple> it = simpleGenerate.iterator(); it.hasNext(); ) {
-			Tuple t = it.next();
-			plan1.attachInput(t); 
-			plan2.attachInput(t);
-			Result output = poGen.getNext(t);
-			while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
-				//System.out.println(output.result);
-				obtained.add(((Tuple) output.result).toString());
-				output = poGen.getNext(t);
-			}
-		}
-		
-		int count = 0;
-		for(Iterator<Tuple> it = simpleGenerate.iterator(); it.hasNext(); ) {
-			Tuple t = it.next();
-			assertTrue(obtained.contains(t.toString()));
-			++count;
-		}
-		assertEquals(simpleGenerate.size(), count);
-		
-	}
+    DataBag cogroup;
+    DataBag partialFlatten;
+    DataBag simpleGenerate;
+    Random r = new Random();
+    BagFactory bf = BagFactory.getInstance();
+    TupleFactory tf = TupleFactory.getInstance();
+    
+    @Before
+    public void setUp() throws Exception {
+        Tuple [] inputA = new Tuple[4];
+        Tuple [] inputB = new Tuple[4];
+        for(int i = 0; i < 4; i++) {
+            inputA[i] = tf.newTuple(2);
+            inputB[i] = tf.newTuple(1);
+        }
+        inputA[0].set(0, 'a');
+        inputA[0].set(1, '1');
+        inputA[1].set(0, 'b');
+        inputA[1].set(1, '1');
+        inputA[2].set(0, 'a');
+        inputA[2].set(1, '1');
+        inputA[3].set(0, 'c');
+        inputA[3].set(1, '1');
+        inputB[0].set(0, 'b');
+        inputB[1].set(0, 'b');
+        inputB[2].set(0, 'a');
+        inputB[3].set(0, 'd');
+        DataBag cg11 = bf.newDefaultBag();
+        cg11.add(inputA[0]);
+        cg11.add(inputA[2]);
+        DataBag cg21 = bf.newDefaultBag();
+        cg21.add(inputA[1]);
+        DataBag cg31 = bf.newDefaultBag();
+        cg31.add(inputA[3]);
+        DataBag emptyBag = bf.newDefaultBag();
+        DataBag cg12 = bf.newDefaultBag();
+        cg12.add(inputB[2]);
+        DataBag cg22 = bf.newDefaultBag();
+        cg22.add(inputB[0]);
+        cg22.add(inputB[1]);
+        DataBag cg42 = bf.newDefaultBag();
+        cg42.add(inputB[3]);
+        Tuple [] tIn = new Tuple[4];
+        for(int i = 0; i < 4; ++i) {
+            tIn[i] = tf.newTuple(2);
+        }
+        tIn[0].set(0, cg11);
+        tIn[0].set(1, cg12);
+        tIn[1].set(0, cg21);
+        tIn[1].set(1, cg22);
+        tIn[2].set(0, cg31);
+        tIn[2].set(1, emptyBag);
+        tIn[3].set(0, emptyBag);
+        tIn[3].set(1, cg42);
+        
+        cogroup = bf.newDefaultBag();
+        for(int i = 0; i < 4; ++i) {
+            cogroup.add(tIn[i]);
+        }
+        
+        Tuple[] tPartial = new Tuple[4];
+        for(int i = 0; i < 4; ++i) {
+            tPartial[i] = tf.newTuple(2);
+            tPartial[i].set(0, inputA[i].get(0));
+            tPartial[i].set(1, inputA[i].get(1));
+        }
+        
+        tPartial[0].append(cg12);
+        
+        tPartial[1].append(cg22);
+        
+        tPartial[2].append(cg12);
+        
+        tPartial[3].append(emptyBag);
+        
+        partialFlatten = bf.newDefaultBag();
+        for(int i = 0; i < 4; ++i) {
+            partialFlatten.add(tPartial[i]);
+        }
+        
+        simpleGenerate = bf.newDefaultBag();
+        for(int i = 0; i < 4; ++i) {
+            simpleGenerate.add(inputA[i]);
+        }
+        /*
+        System.out.println("Cogroup : " + cogroup);
+        System.out.println("Partial : " + partialFlatten);
+        System.out.println("Simple : " + simpleGenerate);
+        */
+    }
+    
+    public void testJoin() throws Exception {
+        ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+        prj1.setResultType(DataType.BAG);
+        prj2.setResultType(DataType.BAG);
+        List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+        toBeFlattened.add(true);
+        toBeFlattened.add(true);
+        ExprPlan plan1 = new ExprPlan();
+        plan1.add(prj1);
+        ExprPlan plan2 = new ExprPlan();
+        plan2.add(prj2);
+        List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+        inputs.add(plan1); 
+        inputs.add(plan2);
+        PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
+        //DataBag obtained = bf.newDefaultBag();
+        for(Iterator<Tuple> it = cogroup.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan1.attachInput(t); 
+            plan2.attachInput(t);
+            Result output = poGen.getNext(t);
+            while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
+                //System.out.println(output.result);
+                Tuple tObtained = (Tuple) output.result;
+                assertTrue(tObtained.get(0).toString().equals(tObtained.get(2).toString()));
+                //obtained.add((Tuple) output.result);
+                output = poGen.getNext(t);
+            }
+        }
+        
+    }
+    
+    public void testPartialJoin() throws Exception {
+        ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+        prj1.setResultType(DataType.BAG);
+        prj2.setResultType(DataType.BAG);
+        List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+        toBeFlattened.add(true);
+        toBeFlattened.add(false);
+        ExprPlan plan1 = new ExprPlan();
+        plan1.add(prj1);
+        ExprPlan plan2 = new ExprPlan();
+        plan2.add(prj2);
+        List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+        inputs.add(plan1); 
+        inputs.add(plan2);
+        PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
+        
+        //DataBag obtained = bf.newDefaultBag();
+        List<String> obtained = new LinkedList<String>();
+        for(Iterator<Tuple> it = cogroup.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan1.attachInput(t); 
+            plan2.attachInput(t);
+            Result output = poGen.getNext(t);
+            while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
+                //System.out.println(output.result);
+                obtained.add(((Tuple) output.result).toString());
+                output = poGen.getNext(t);
+            }
+        }
+        int count = 0;
+        for(Iterator<Tuple> it = partialFlatten.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            assertTrue(obtained.contains(t.toString()));
+            ++count;
+        }
+        assertEquals(partialFlatten.size(), count);
+        
+    }
+    
+    public void testSimpleGenerate() throws Exception {
+        ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+        prj1.setResultType(DataType.BAG);
+        prj2.setResultType(DataType.BAG);
+        List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+        toBeFlattened.add(true);
+        toBeFlattened.add(false);
+        ExprPlan plan1 = new ExprPlan();
+        plan1.add(prj1);
+        ExprPlan plan2 = new ExprPlan();
+        plan2.add(prj2);
+        List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+        inputs.add(plan1); 
+        inputs.add(plan2);
+        PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
+        
+        //DataBag obtained = bf.newDefaultBag();
+        List<String> obtained = new LinkedList<String>();
+        for(Iterator<Tuple> it = simpleGenerate.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan1.attachInput(t); 
+            plan2.attachInput(t);
+            Result output = poGen.getNext(t);
+            while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
+                //System.out.println(output.result);
+                obtained.add(((Tuple) output.result).toString());
+                output = poGen.getNext(t);
+            }
+        }
+        
+        int count = 0;
+        for(Iterator<Tuple> it = simpleGenerate.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            assertTrue(obtained.contains(t.toString()));
+            ++count;
+        }
+        assertEquals(simpleGenerate.size(), count);
+        
+    }
 }

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java?rev=649290&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java Thu Apr 17 14:39:26 2008
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPackage {
+
+    @Before
+    public void setUp() throws Exception {
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+    
+    static class ITIterator implements Iterator<IndexedTuple>,
+            Iterable<IndexedTuple> {
+        private Iterator<Tuple> it;
+
+        public ITIterator(Iterator<Tuple> it) {
+            this.it = it;
+        }
+
+        public boolean hasNext() {
+            return it.hasNext();
+        }
+
+        public IndexedTuple next() {
+            return (IndexedTuple) it.next();
+        }
+
+        public void remove() {
+            // TODO Auto-generated method stub
+
+        }
+
+        public Iterator<IndexedTuple> iterator() {
+            return this;
+        }
+
+    }
+    
+    public static boolean test(Object key, boolean inner[]) throws ExecException, IOException {
+        boolean ret = false;
+        Random r = new Random();
+        DataBag db1 = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
+        DataBag db2 = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
+        DataBag db = DefaultBagFactory.getInstance().newDefaultBag();
+        Iterator db1Iter = db1.iterator();
+        if(!inner[0]){
+            while (db1Iter.hasNext()) {
+                IndexedTuple it = new IndexedTuple((Tuple) db1Iter.next(), 0);
+                db.add(it);
+            }
+        }
+        Iterator db2Iter = db2.iterator();
+        while (db2Iter.hasNext()) {
+            IndexedTuple it = new IndexedTuple((Tuple) db2Iter.next(), 1);
+            db.add(it);
+        }
+        ITIterator iti = new TestPackage.ITIterator(db.iterator());
+        POPackage pop = new POPackage(new OperatorKey("", r.nextLong()));
+        pop.setNumInps(2);
+        pop.setInner(inner);
+        pop.attachInput(key, iti);
+        Tuple t = null;
+        Result res = null;
+        res = (Result) pop.getNext(t);
+        if(res.returnStatus==POStatus.STATUS_NULL && inner[0])
+            return true;
+        if (res.returnStatus != POStatus.STATUS_OK)
+            return false;
+
+        t = (Tuple) res.result;
+        Object outKey = t.get(0);
+        DataBag outDb1 = (DataBag) t.get(1);
+        DataBag outDb2 = (DataBag) t.get(2);
+
+        if (outKey == key && TestHelper.compareBags(db1, outDb1)
+                && TestHelper.compareBags(db2, outDb2))
+            return true;
+        return ret;
+    }
+
+    /**
+     * To show that it does not have any type specific
+     * code
+     */
+    private static boolean test(byte t, boolean[] inner) throws ExecException, IOException {
+        Random r = new Random();
+        switch (t) {
+        case DataType.BAG:
+            return test(GenRandomData.genRandSmallTupDataBag(r, 10, 100),inner);
+        case DataType.BOOLEAN:
+            return test(r.nextBoolean(),inner);
+        case DataType.BYTEARRAY:
+            return test(GenRandomData.genRandDBA(r),inner);
+        case DataType.CHARARRAY:
+            return test(GenRandomData.genRandString(r),inner);
+        case DataType.DOUBLE:
+            return test(r.nextDouble(),inner);
+        case DataType.FLOAT:
+            return test(r.nextFloat(),inner);
+        case DataType.INTEGER:
+            return test(r.nextLong(),inner);
+        case DataType.LONG:
+            return test(r.nextLong(),inner);
+        case DataType.MAP:
+            return test(GenRandomData.genRandMap(r, 10),inner);
+        case DataType.TUPLE:
+            return test(GenRandomData.genRandSmallBagTuple(r, 10, 100),inner);
+        }
+        return false;
+    }
+
+    @Test
+    public void testOperator() throws ExecException, IOException{
+        byte[] types = DataType.genAllTypes();
+//        Map<Byte, String> map = operatorHelper.genTypeToNameMap();
+//        System.out.println("Testing Package:");
+        for (byte b : types) {
+//            System.out.print("\t With " + map.get(b) + ": ");
+            boolean succ = true;
+            int NUM_TRIALS = 10;
+            boolean[] inner1 = { false , false };
+            for (int i = 0; i < NUM_TRIALS; i++)
+                succ &= test(b, inner1);
+            assertEquals(true, succ);
+            
+            boolean[] inner2 = { true , false };
+            for (int i = 0; i < NUM_TRIALS; i++)
+                succ &= test(b, inner2);
+            assertEquals(true, succ);
+            /*if (succ)
+                System.out.println("Success!!");
+            else
+                System.out.println("Failure");*/
+        }
+    }
+}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java Thu Apr 17 14:39:26 2008
@@ -18,23 +18,32 @@
 package org.apache.pig.test.utils;
 
 import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.OperatorKey;
 import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+// import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+// import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
+// import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr;
+// import
+// org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr;
+// import
+// org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GreaterThanExpr;
@@ -44,139 +53,278 @@
 
 public class GenPhyOp {
     static Random r = new Random();
-    public static ConstantExpression exprConst(){
-        ConstantExpression ret = new ConstantExpression(new OperatorKey("",r.nextLong()));
+
+    public static ConstantExpression exprConst() {
+        ConstantExpression ret = new ConstantExpression(new OperatorKey("", r
+                .nextLong()));
         return ret;
     }
 
-    public static GreaterThanExpr compGreaterThanExpr(){
-        GreaterThanExpr ret = new GreaterThanExpr(new OperatorKey("",r.nextLong()));
+    public static GreaterThanExpr compGreaterThanExpr() {
+        GreaterThanExpr ret = new GreaterThanExpr(new OperatorKey("", r
+                .nextLong()));
         return ret;
     }
-    
-    public static POProject exprProject(){
-        POProject ret = new POProject(new OperatorKey("",r.nextLong()));
+
+    public static POProject exprProject() {
+        POProject ret = new POProject(new OperatorKey("", r.nextLong()));
         return ret;
     }
-    
-    public static GTOrEqualToExpr compGTOrEqualToExpr(){
-        GTOrEqualToExpr ret = new GTOrEqualToExpr(new OperatorKey("",r.nextLong()));
+
+    public static GTOrEqualToExpr compGTOrEqualToExpr() {
+        GTOrEqualToExpr ret = new GTOrEqualToExpr(new OperatorKey("", r
+                .nextLong()));
         return ret;
     }
-    
-    public static EqualToExpr compEqualToExpr(){
-        EqualToExpr ret = new EqualToExpr(new OperatorKey("",r.nextLong()));
+
+    public static EqualToExpr compEqualToExpr() {
+        EqualToExpr ret = new EqualToExpr(new OperatorKey("", r.nextLong()));
         return ret;
     }
-    
-    public static NotEqualToExpr compNotEqualToExpr(){
-        NotEqualToExpr ret = new NotEqualToExpr(new OperatorKey("",r.nextLong()));
+
+    public static NotEqualToExpr compNotEqualToExpr() {
+        NotEqualToExpr ret = new NotEqualToExpr(new OperatorKey("", r
+                .nextLong()));
+        return ret;
+    }
+
+    public static LessThanExpr compLessThanExpr() {
+        LessThanExpr ret = new LessThanExpr(new OperatorKey("", r.nextLong()));
+        return ret;
+    }
+
+    public static LTOrEqualToExpr compLTOrEqualToExpr() {
+        LTOrEqualToExpr ret = new LTOrEqualToExpr(new OperatorKey("", r
+                .nextLong()));
+        return ret;
+    }
+
+    public static POLocalRearrange topLocalRearrangeOp() {
+        POLocalRearrange ret = new POLocalRearrange(new OperatorKey("", r
+                .nextLong()));
         return ret;
     }
     
-    public static LessThanExpr compLessThanExpr(){
-        LessThanExpr ret = new LessThanExpr(new OperatorKey("",r.nextLong()));
+    public static POForEach topForEachOp() {
+        POForEach ret = new POForEach(new OperatorKey("", r
+                .nextLong()));
+        return ret;
+    }
+
+    public static POGenerate topGenerateOp() {
+        POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()));
         return ret;
     }
     
-    public static LTOrEqualToExpr compLTOrEqualToExpr(){
-        LTOrEqualToExpr ret = new LTOrEqualToExpr(new OperatorKey("",r.nextLong()));
+    /**
+     * creates the POGenerate operator for 
+     * generate grpCol, *.
+     * 
+     * @param grpCol - The column to be grouped on
+     * @param sample - The sample tuple that is used to infer
+     *                  result types and #projects for *
+     * @return - The POGenerate operator which has the exprplan
+     *              for generate grpCol, * set.
+     * @throws IOException
+     */
+    public static POGenerate topGenerateOpWithExPlan(int grpCol, Tuple sample) throws IOException {
+        POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, grpCol);
+        prj1.setResultType(sample.getType(grpCol));
+        prj1.setOverloaded(false);
+        
+        
+
+        List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+        toBeFlattened.add(false);
+        
+
+        ExprPlan plan1 = new ExprPlan();
+        plan1.add(prj1);
+        
+        List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+        inputs.add(plan1);
+        
+        POProject rest[] = new POProject[sample.size()];
+        int i=-1;
+        for (POProject project : rest) {
+            project = new POProject(new OperatorKey("", r.nextLong()), -1, ++i);
+            project.setResultType(sample.getType(i));
+            project.setOverloaded(false);
+            
+            ExprPlan pl = new ExprPlan();
+            pl.add(project);
+            
+            toBeFlattened.add(false);
+            inputs.add(pl);
+        }
+
+        
+
+        POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()),
+                inputs, toBeFlattened);
         return ret;
     }
     
-//    public static POLocalRearrange topLocalRearrangeOp(){
-//        POLocalRearrange ret = new POLocalRearrange(new OperatorKey("",r.nextLong()));
-//        return ret;
-//    }
-//    
-//    public static POGenerate topGenerateOp(){
-//        POGenerate ret = new POGenerate(new OperatorKey("",r.nextLong()));
-//        return ret;
-//    }
-//    
-   public static POLoad topLoadOp(){
-       POLoad ret = new POLoad(new OperatorKey("",r.nextLong()));
-       return ret;
-   }
+    /**
+     * creates the POGenerate operator for 
+     * 'generate field'.
+     * 
+     * @param field - The column to be generated
+     * @param sample - The sample tuple that is used to infer
+     *                  result type
+     * @return - The POGenerate operator which has the exprplan
+     *              for 'generate field' set.
+     * @throws IOException
+     */
+    public static POGenerate topGenerateOpWithExPlanForFe(int field, Tuple sample) throws IOException {
+        POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, field);
+        prj1.setResultType(sample.getType(field));
+        prj1.setOverloaded(false);
+        
+        
+
+        List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+        toBeFlattened.add(false);
+        
+
+        ExprPlan plan1 = new ExprPlan();
+        plan1.add(prj1);
+        
+        List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+        inputs.add(plan1);
+        
+        POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()),
+                inputs, toBeFlattened);
+        return ret;
+    }
     
-    public static POFilter topFilterOp(){
-        POFilter ret = new POFilter(new OperatorKey("",r.nextLong()));
+    /**
+     * creates the POLocalRearrange operator with the given index for
+     * group by grpCol
+     * @param index - The input index of this POLocalRearrange operator
+     * @param grpCol - The column to be grouped on
+     * @param sample - Sample tuple needed for topGenerateOpWithExPlan
+     * @return - The POLocalRearrange operator
+     * @throws IOException
+     */
+    public static POLocalRearrange topLocalRearrangeOPWithPlan(int index, int grpCol, Tuple sample) throws IOException{
+        POGenerate gen = topGenerateOpWithExPlan(grpCol, sample);
+        PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>();
+        pp.add(gen);
+        
+        POLocalRearrange ret = topLocalRearrangeOp();
+        ret.setPlan(pp);
+        ret.setIndex(index);
+        ret.setResultType(DataType.TUPLE);
         return ret;
     }
     
-    public static POFilter topFilterOpWithExPlan(int lhsVal, int rhsVal) throws IOException{
-        POFilter ret = new POFilter(new OperatorKey("",r.nextLong()));
-        
+    /**
+     * creates the POForEach operator for
+     * foreach A generate field
+     * @param field - The column to be generated
+     * @param sample - Sample tuple needed for topGenerateOpWithExPlanForFe
+     * @return - The POForEach operator
+     * @throws IOException
+     */
+    public static POForEach topForEachOPWithPlan(int field, Tuple sample) throws IOException{
+        POGenerate gen = topGenerateOpWithExPlanForFe(field, sample);
+        PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>();
+        pp.add(gen);
+        
+        POForEach ret = topForEachOp();
+        ret.setPlan(pp);
+        ret.setResultType(DataType.TUPLE);
+        return ret;
+    }
+
+    public static POLoad topLoadOp() {
+        POLoad ret = new POLoad(new OperatorKey("", r.nextLong()));
+        return ret;
+    }
+
+    public static POFilter topFilterOp() {
+        POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
+        return ret;
+    }
+
+    public static POFilter topFilterOpWithExPlan(int lhsVal, int rhsVal)
+            throws IOException {
+        POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
+
         ConstantExpression ce1 = GenPhyOp.exprConst();
         ce1.setValue(lhsVal);
-        
+
         ConstantExpression ce2 = GenPhyOp.exprConst();
         ce2.setValue(rhsVal);
-        
+
         GreaterThanExpr gr = GenPhyOp.compGreaterThanExpr();
         gr.setLhs(ce1);
         gr.setRhs(ce2);
         gr.setOperandType(DataType.INTEGER);
-        
+
         ExprPlan ep = new ExprPlan();
         ep.add(ce1);
         ep.add(ce2);
         ep.add(gr);
-        
+
         ep.connect(ce1, gr);
         ep.connect(ce2, gr);
-        
+
         ret.setPlan(ep);
-        
+
         return ret;
     }
-    
-    public static POFilter topFilterOpWithProj(int col, int rhsVal) throws IOException{
-        POFilter ret = new POFilter(new OperatorKey("",r.nextLong()));
-        
+
+    public static POFilter topFilterOpWithProj(int col, int rhsVal)
+            throws IOException {
+        POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
+
         POProject proj = exprProject();
         proj.setResultType(DataType.INTEGER);
         proj.setColumn(col);
         proj.setOverloaded(false);
-        
+
         ConstantExpression ce2 = GenPhyOp.exprConst();
         ce2.setValue(rhsVal);
-        
+
         GreaterThanExpr gr = GenPhyOp.compGreaterThanExpr();
         gr.setLhs(proj);
         gr.setRhs(ce2);
         gr.setOperandType(DataType.INTEGER);
-        
+
         ExprPlan ep = new ExprPlan();
         ep.add(proj);
         ep.add(ce2);
         ep.add(gr);
-        
+
         ep.connect(proj, gr);
         ep.connect(ce2, gr);
-        
+
         ret.setPlan(ep);
-        
+
         return ret;
     }
-//    
-//    public static POGlobalRearrange topGlobalRearrangeOp(){
-//        POGlobalRearrange ret = new POGlobalRearrange(new OperatorKey("",r.nextLong()));
-//        return ret;
-//    }
-//    
-//    public static POPackage topPackageOp(){
-//        POPackage ret = new POPackage(new OperatorKey("",r.nextLong()));
-//        return ret;
-//    }
-//    
-   public static POStore topStoreOp(){
-       POStore ret = new POStore(new OperatorKey("",r.nextLong()));
-       return ret;
-   }
-//    
-//    public static StartMap topStartMapOp(){
-//        StartMap ret = new StartMap(new OperatorKey("",r.nextLong()));
-//        return ret;
-//    }
+
+    //    
+    // public static POGlobalRearrange topGlobalRearrangeOp(){
+    // POGlobalRearrange ret = new POGlobalRearrange(new
+    // OperatorKey("",r.nextLong()));
+    // return ret;
+    // }
+    //    
+    // public static POPackage topPackageOp(){
+    // POPackage ret = new POPackage(new OperatorKey("",r.nextLong()));
+    // return ret;
+    // }
+    //    
+    public static POStore topStoreOp() {
+        POStore ret = new POStore(new OperatorKey("", r.nextLong()));
+        return ret;
+    }
+    //    
+    // public static StartMap topStartMapOp(){
+    // StartMap ret = new StartMap(new OperatorKey("",r.nextLong()));
+    //        return ret;
+    //    }
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java Thu Apr 17 14:39:26 2008
@@ -17,9 +17,12 @@
  */
 package org.apache.pig.test.utils;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.Tuple;
 
 /**
@@ -35,5 +38,37 @@
                 return true;
         }
         return false;
+    }
+    
+    public static boolean compareBags(DataBag db1, DataBag db2) {
+        if (db1.size() != db2.size())
+            return false;
+
+        boolean equal = true;
+        for (Tuple tuple : db2) {
+            boolean contains = false;
+            for (Tuple tuple2 : db1) {
+                if (tuple.compareTo(tuple2) == 0) {
+                    contains = true;
+                    break;
+                }
+            }
+            if (!contains) {
+                equal = false;
+                break;
+            }
+        }
+        return equal;
+    }
+    
+    public static DataBag projectBag(DataBag db2, int i) throws IOException {
+        DataBag ret = DefaultBagFactory.getInstance().newDefaultBag();
+        for (Tuple tuple : db2) {
+            Object o = tuple.get(i);
+            Tuple t1 = new DefaultTuple();
+            t1.append(o);
+            ret.add(t1);
+        }
+        return ret;
     }
 }