You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/13 23:11:24 UTC

svn commit: r656011 [5/5] - in /incubator/pig/branches/types: ./ lib/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/data/ src/org/apache/pig/impl/io/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/i...

Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java?rev=656011&r1=656010&r2=656011&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java Tue May 13 14:11:21 2008
@@ -17,33 +17,42 @@
  */
 package org.apache.pig.test.utils;
 
+import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.mapReduceLayer.MapReduceOper;
 import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
-import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
-// import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
-// import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSplit;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
-// import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
-import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POCast;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
-// import
-// org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr;
-// import
-// org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Add;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Divide;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Mod;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Multiply;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Subtract;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.ComparisonOperator;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GreaterThanExpr;
@@ -52,9 +61,17 @@
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.NotEqualToExpr;
 import org.apache.pig.impl.plan.PlanException;
 
-public class GenPhyOp {
+public class GenPhyOp{
     static Random r = new Random();
-
+    
+    public static final byte GTE = 1;
+    public static final byte GT = 2;
+    public static final byte LTE = 3;
+    public static final byte LT = 4;
+    
+    public static PigContext pc;
+//    public static MiniCluster cluster = MiniCluster.buildCluster();
+    
     public static ConstantExpression exprConst() {
         ConstantExpression ret = new ConstantExpression(new OperatorKey("", r
                 .nextLong()));
@@ -106,6 +123,17 @@
         return ret;
     }
     
+    public static POGlobalRearrange topGlobalRearrangeOp(){
+        POGlobalRearrange ret = new POGlobalRearrange(new OperatorKey("", r
+                .nextLong()));
+        return ret;
+    }
+    
+    public static POPackage topPackageOp(){
+        POPackage ret = new POPackage(new OperatorKey("", r.nextLong()));
+        return ret;
+    }
+    
     public static POForEach topForEachOp() {
         POForEach ret = new POForEach(new OperatorKey("", r
                 .nextLong()));
@@ -117,6 +145,11 @@
         return ret;
     }
     
+    public static POUnion topUnionOp() {
+        POUnion ret = new POUnion(new OperatorKey("", r.nextLong()));
+        return ret;
+    }
+    
     /**
      * creates the POGenerate operator for 
      * generate grpCol, *.
@@ -168,6 +201,65 @@
     
     /**
      * creates the POGenerate operator for 
+     * generate grpCol, *.
+     * 
+     * @param grpCol - The column to be grouped on
+     * @param sample - The sample tuple that is used to infer
+     *                  result types and #projects for *
+     * @return - The POGenerate operator which has the exprplan
+     *              for generate grpCol, * set.
+     * @throws ExecException
+     * @throws PlanException 
+     */
+    public static POGenerate topGenerateOpWithExPlanLR(int grpCol, Tuple sample) throws ExecException, PlanException {
+        POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, grpCol);
+        prj1.setResultType(sample.getType(grpCol));
+        prj1.setOverloaded(false);
+        
+        POCast cst = new POCast(new OperatorKey("",r.nextLong()));
+        cst.setResultType(sample.getType(grpCol));
+
+        List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+        toBeFlattened.add(false);
+        
+
+        ExprPlan plan1 = new ExprPlan();
+        plan1.add(prj1);
+        plan1.add(cst);
+        plan1.connect(prj1, cst);
+        
+        List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+        inputs.add(plan1);
+        
+        POProject rest[] = new POProject[sample.size()];
+        POCast csts[] = new POCast[sample.size()];
+        int i=-1;
+        for (POProject project : rest) {
+            project = new POProject(new OperatorKey("", r.nextLong()), -1, ++i);
+            project.setResultType(sample.getType(i));
+            project.setOverloaded(false);
+            
+            csts[i] = new POCast(new OperatorKey("",r.nextLong()));
+            csts[i].setResultType(sample.getType(i));
+            
+            ExprPlan pl = new ExprPlan();
+            pl.add(project);
+            pl.add(csts[i]);
+            pl.connect(project, csts[i]);
+            
+            toBeFlattened.add(false);
+            inputs.add(pl);
+        }
+
+        
+
+        POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()),
+                inputs, toBeFlattened);
+        return ret;
+    }
+    
+    /**
+     * creates the POGenerate operator for 
      * 'generate field'.
      * 
      * @param field - The column to be generated
@@ -200,6 +292,129 @@
     }
     
     /**
+     * creates the POGenerate operator for 
+     * 'generate flatten(field)'.
+     * 
+     * @param field - The column to be generated
+     * @param sample - The sample tuple that is used to infer
+     *                  result type
+     * @return - The POGenerate operator which has the exprplan
+     *              for 'generate field' set.
+     * @throws ExecException
+     */
+    public static POGenerate topGenerateOpWithExPlanForFeFlat(int field) throws ExecException {
+        POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, field);
+        prj1.setResultType(DataType.BAG);
+        prj1.setOverloaded(false);
+
+        List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+        toBeFlattened.add(true);
+
+        ExprPlan plan1 = new ExprPlan();
+        plan1.add(prj1);
+        
+        List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+        inputs.add(plan1);
+        
+        POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()),
+                inputs, toBeFlattened);
+        return ret;
+    }
+    
+    /**
+     * creates the POGenerate operator for 
+     * 'generate field[0] field[1] ...'.
+     * 
+     * @param field - The columns to be generated
+     * @param sample - The sample tuple that is used to infer
+     *                  result type
+     * @return - The POGenerate operator which has the exprplan
+     *              for 'generate field[0] field[1]' set.
+     * @throws ExecException
+     * @throws PlanException 
+     */
+    public static POGenerate topGenerateOpWithExPlanForFe(int[] fields, Tuple sample) throws ExecException, PlanException {
+        POProject[] prj = new POProject[fields.length];
+
+        for(int i=0;i<prj.length;i++){
+            prj[i] = new POProject(new OperatorKey("", r.nextLong()), -1, fields[i]);
+            prj[i].setResultType(sample.getType(fields[i]));
+            prj[i].setOverloaded(false);
+        }
+        
+        
+        POCast[] cst = new POCast[fields.length];
+
+        List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+        for (POProject project : prj)
+            toBeFlattened.add(false);
+        
+        
+
+        List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+        ExprPlan[] plans = new ExprPlan[fields.length];
+        for (int i=0;i<plans.length;i++) {
+            plans[i] = new ExprPlan();
+            plans[i].add(prj[i]);
+            cst[i] = new POCast(new OperatorKey("",r.nextLong()));
+            cst[i].setResultType(sample.getType(fields[i]));
+            plans[i].add(cst[i]);
+            plans[i].connect(prj[i], cst[i]);
+            inputs.add(plans[i]);
+        }
+        
+        POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()),
+                inputs, toBeFlattened);
+        return ret;
+    }
+    
+    /**
+     * creates the POGenerate operator for 
+     * 'generate field[0] field[1] ...'.
+     * with the flatten list as specified
+     * @param field - The columns to be generated
+     * @param toBeFlattened - The columns to be flattened
+     * @param sample - The sample tuple that is used to infer
+     *                  result type
+     * @return - The POGenerate operator which has the exprplan
+     *              for 'generate field[0] field[1]' set.
+     * @throws ExecException
+     * @throws PlanException 
+     */
+    public static POGenerate topGenerateOpWithExPlanForFe(int[] fields, Tuple sample, List<Boolean> toBeFlattened) throws ExecException, PlanException {
+        POProject[] prj = new POProject[fields.length];
+
+        for(int i=0;i<prj.length;i++){
+            prj[i] = new POProject(new OperatorKey("", r.nextLong()), -1, fields[i]);
+            prj[i].setResultType(sample.getType(fields[i]));
+            prj[i].setOverloaded(false);
+        }
+        
+        
+        POCast[] cst = new POCast[fields.length];
+
+        /*List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+        for (POProject project : prj)
+            toBeFlattened.add(false);*/
+        
+        List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+        ExprPlan[] plans = new ExprPlan[fields.length];
+        for (int i=0;i<plans.length;i++) {
+            plans[i] = new ExprPlan();
+            plans[i].add(prj[i]);
+            cst[i] = new POCast(new OperatorKey("",r.nextLong()));
+            cst[i].setResultType(sample.getType(fields[i]));
+            plans[i].add(cst[i]);
+            plans[i].connect(prj[i], cst[i]);
+            inputs.add(plans[i]);
+        }
+        
+        POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()),
+                inputs, toBeFlattened);
+        return ret;
+    }
+    
+    /**
      * creates the POLocalRearrange operator with the given index for
      * group by grpCol
      * @param index - The input index of this POLocalRearrange operator
@@ -208,7 +423,28 @@
      * @return - The POLocalRearrange operator
      * @throws ExecException
      */
-    public static POLocalRearrange topLocalRearrangeOPWithPlan(int index, int grpCol, Tuple sample) throws ExecException{
+    public static POLocalRearrange topLocalRearrangeOPWithPlan(int index, int grpCol, Tuple sample) throws ExecException, PlanException{
+        POGenerate gen = topGenerateOpWithExPlanLR(grpCol, sample);
+        PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>();
+        pp.add(gen);
+        
+        POLocalRearrange ret = topLocalRearrangeOp();
+        ret.setPlan(pp);
+        ret.setIndex(index);
+        ret.setResultType(DataType.TUPLE);
+        return ret;
+    }
+    
+    /**
+     * creates the POLocalRearrange operator with the given index for
+     * group by grpCol
+     * @param index - The input index of this POLocalRearrange operator
+     * @param grpCol - The column to be grouped on
+     * @param sample - Sample tuple needed for topGenerateOpWithExPlan
+     * @return - The POLocalRearrange operator
+     * @throws ExecException
+     */
+    public static POLocalRearrange topLocalRearrangeOPWithPlanPlain(int index, int grpCol, Tuple sample) throws ExecException, PlanException{
         POGenerate gen = topGenerateOpWithExPlan(grpCol, sample);
         PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>();
         pp.add(gen);
@@ -228,7 +464,7 @@
      * @return - The POForEach operator
      * @throws ExecException
      */
-    public static POForEach topForEachOPWithPlan(int field, Tuple sample) throws ExecException{
+    public static POForEach topForEachOPWithPlan(int field, Tuple sample) throws ExecException, PlanException{
         POGenerate gen = topGenerateOpWithExPlanForFe(field, sample);
         PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>();
         pp.add(gen);
@@ -239,8 +475,67 @@
         return ret;
     }
 
+    /**
+     * creates the POForEach operator for
+     * foreach A generate field[0] field[1]
+     * @param fields - The columns to be generated
+     * @param sample - Sample tuple needed for topGenerateOpWithExPlanForFe
+     * @return - The POForEach operator
+     * @throws ExecException
+     */
+    public static POForEach topForEachOPWithPlan(int[] fields, Tuple sample) throws ExecException, PlanException{
+        POGenerate gen = topGenerateOpWithExPlanForFe(fields, sample);
+        PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>();
+        pp.add(gen);
+        
+        POForEach ret = topForEachOp();
+        ret.setPlan(pp);
+        ret.setResultType(DataType.TUPLE);
+        return ret;
+    }
+    
+    /**
+     * creates the POForEach operator for
+     * foreach A generate field[0] field[1]
+     * @param fields - The columns to be generated
+     * @param sample - Sample tuple needed for topGenerateOpWithExPlanForFe
+     * @return - The POForEach operator
+     * @throws ExecException
+     */
+    public static POForEach topForEachOPWithPlan(int[] fields, Tuple sample, List<Boolean> toBeFlattened) throws ExecException, PlanException{
+        POGenerate gen = topGenerateOpWithExPlanForFe(fields, sample, toBeFlattened);
+        PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>();
+        pp.add(gen);
+        
+        POForEach ret = topForEachOp();
+        ret.setPlan(pp);
+        ret.setResultType(DataType.TUPLE);
+        return ret;
+    }
+    
+    /**
+     * creates the POForEach operator for
+     * foreach A generate flatten(field)
+     * @param fields - The columns to be generated
+     * @param sample - Sample tuple needed for topGenerateOpWithExPlanForFe
+     * @return - The POForEach operator
+     * @throws ExecException
+     */
+    public static POForEach topForEachOPWithPlan(int field) throws ExecException, PlanException{
+        POGenerate gen = topGenerateOpWithExPlanForFeFlat(field);
+        PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>();
+        pp.add(gen);
+        
+        POForEach ret = topForEachOp();
+        ret.setPlan(pp);
+        ret.setResultType(DataType.TUPLE);
+        return ret;
+    }
+
+    
     public static POLoad topLoadOp() {
         POLoad ret = new POLoad(new OperatorKey("", r.nextLong()));
+        ret.setPc(pc);
         return ret;
     }
 
@@ -250,7 +545,7 @@
     }
 
     public static POFilter topFilterOpWithExPlan(int lhsVal, int rhsVal)
-            throws PlanException {
+            throws ExecException, PlanException {
         POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
 
         ConstantExpression ce1 = GenPhyOp.exprConst();
@@ -278,7 +573,7 @@
     }
 
     public static POFilter topFilterOpWithProj(int col, int rhsVal)
-            throws PlanException {
+            throws ExecException, PlanException {
         POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
 
         POProject proj = exprProject();
@@ -306,26 +601,219 @@
 
         return ret;
     }
+    
+    public static POFilter topFilterOpWithProj(int col, int rhsVal,
+            byte CompType) throws ExecException, PlanException {
+        POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
+
+        POProject proj = exprProject();
+        proj.setResultType(DataType.INTEGER);
+        proj.setColumn(col);
+        proj.setOverloaded(false);
+
+        ConstantExpression ce2 = GenPhyOp.exprConst();
+        ce2.setValue(rhsVal);
+
+        ComparisonOperator cop = null;
+        switch (CompType) {
+        case GenPhyOp.GTE:
+            cop = GenPhyOp.compGTOrEqualToExpr();
+            break;
+        case GenPhyOp.GT:
+            cop = GenPhyOp.compGreaterThanExpr();
+            break;
+        case GenPhyOp.LTE:
+            cop = GenPhyOp.compLTOrEqualToExpr();
+            break;
+        case GenPhyOp.LT:
+            cop = GenPhyOp.compLessThanExpr();
+            break;
+        }
+
+        cop.setLhs(proj);
+        cop.setRhs(ce2);
+        cop.setOperandType(DataType.INTEGER);
+
+        ExprPlan ep = new ExprPlan();
+        ep.add(proj);
+        ep.add(ce2);
+        ep.add(cop);
+
+        ep.connect(proj, cop);
+        ep.connect(ce2, cop);
+
+        ret.setPlan(ep);
+
+        return ret;
+    }
+    
+    public static POFilter topFilterOpWithProjWithCast(int col, int rhsVal, byte CompType)
+            throws ExecException, PlanException {
+        POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
+
+        POProject proj = exprProject();
+        proj.setResultType(DataType.INTEGER);
+        proj.setColumn(col);
+        proj.setOverloaded(false);
+
+        ConstantExpression ce2 = GenPhyOp.exprConst();
+        ce2.setValue(rhsVal);
+        
+        ComparisonOperator cop = null;
+        switch(CompType){
+        case GenPhyOp.GTE:
+            cop = GenPhyOp.compGTOrEqualToExpr();
+            break;
+        case GenPhyOp.GT:
+            cop = GenPhyOp.compGreaterThanExpr();
+            break;
+        case GenPhyOp.LTE:
+            cop = GenPhyOp.compLTOrEqualToExpr();
+            break;
+        case GenPhyOp.LT:
+            cop = GenPhyOp.compLessThanExpr();
+            break;
+        }
+        
+        POCast cst = new POCast(new OperatorKey("",r.nextLong()));
+        
+        cop.setLhs(cst);
+        cop.setRhs(ce2);
+        cop.setOperandType(DataType.INTEGER);
+
+        ExprPlan ep = new ExprPlan();
+        ep.add(cst);
+        ep.add(proj);
+        ep.add(ce2);
+        ep.add(cop);
+
+        ep.connect(proj, cst);
+        ep.connect(cst, cop);
+        ep.connect(ce2, cop);
+
+        ret.setPlan(ep);
 
-    //    
-    // public static POGlobalRearrange topGlobalRearrangeOp(){
-    // POGlobalRearrange ret = new POGlobalRearrange(new
-    // OperatorKey("",r.nextLong()));
-    // return ret;
-    // }
-    //    
-    // public static POPackage topPackageOp(){
-    // POPackage ret = new POPackage(new OperatorKey("",r.nextLong()));
-    // return ret;
-    // }
-    //    
+        return ret;
+    }
+    
     public static POStore topStoreOp() {
         POStore ret = new POStore(new OperatorKey("", r.nextLong()));
+        ret.setPc(pc);
+        return ret;
+    }
+
+    public static void setR(Random r) {
+        GenPhyOp.r = r;
+    }
+    
+    public static MapReduceOper MROp(){
+        MapReduceOper ret = new MapReduceOper(new OperatorKey("",r.nextLong()));
+        return ret;
+    }
+    
+    private static FileSpec getTempFileSpec() throws IOException {
+        return new FileSpec(FileLocalizer.getTemporaryPath(null, pc).toString(),BinStorage.class.getName());
+    }
+    
+    public static POSplit topSplitOp() throws IOException{
+        POSplit ret = new POSplit(new OperatorKey("",r.nextLong()));
+        ret.setSplitStore(getTempFileSpec());
         return ret;
     }
-    //    
-    // public static StartMap topStartMapOp(){
-    // StartMap ret = new StartMap(new OperatorKey("",r.nextLong()));
-    //        return ret;
-    //    }
+    
+    public static PhysicalPlan<PhysicalOperator> grpChain() throws ExecException, PlanException{
+        PhysicalPlan<PhysicalOperator> grpChain = new PhysicalPlan<PhysicalOperator>();
+        POLocalRearrange lr = GenPhyOp.topLocalRearrangeOp();
+        POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
+        POPackage pk = GenPhyOp.topPackageOp();
+        
+        grpChain.add(lr);
+        grpChain.add(gr);
+        grpChain.add(pk);
+        
+        grpChain.connect(lr, gr);
+        grpChain.connect(gr, pk);
+        
+        return grpChain;
+    }
+    
+    public static PhysicalPlan<PhysicalOperator> loadedGrpChain() throws ExecException, PlanException{
+        PhysicalPlan<PhysicalOperator> ret = new PhysicalPlan<PhysicalOperator>();
+        POLoad ld = GenPhyOp.topLoadOp();
+        POLocalRearrange lr = GenPhyOp.topLocalRearrangeOp();
+        POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
+        POPackage pk = GenPhyOp.topPackageOp();
+        
+        ret.add(ld);
+        ret.add(lr);
+        ret.add(gr);
+        ret.add(pk);
+        
+        ret.connect(ld, lr);
+        ret.connect(lr, gr);
+        ret.connect(gr, pk);
+        
+        return ret;
+    }
+    
+    public static PhysicalPlan<PhysicalOperator> loadedFilter() throws ExecException, PlanException{
+        PhysicalPlan<PhysicalOperator> ret = new PhysicalPlan<PhysicalOperator>();
+        POLoad ld = GenPhyOp.topLoadOp();
+        POFilter fl = GenPhyOp.topFilterOp();
+        ret.add(ld);
+        ret.add(fl);
+        
+        ret.connect(ld, fl);
+        return ret;
+    }
+    
+    public static ExprPlan arithPlan() throws PlanException{
+        ExprPlan ep = new ExprPlan();
+        ConstantExpression ce[] = new ConstantExpression[7];
+        for(int i=0;i<ce.length;i++){
+            ce[i] = GenPhyOp.exprConst();
+            ce[i].setValue(i);
+            ep.add(ce[i]);
+        }
+        
+        Add ad = new Add(getOK());
+        ep.add(ad);
+        ep.connect(ce[0], ad);
+        ep.connect(ce[1], ad);
+        
+        Divide div = new Divide(getOK());
+        ep.add(div);
+        ep.connect(ce[2], div);
+        ep.connect(ce[3], div);
+        
+        Subtract sub = new Subtract(getOK());
+        ep.add(sub);
+        ep.connect(ad, sub);
+        ep.connect(div, sub);
+        
+        Mod mod = new Mod(getOK());
+        ep.add(mod);
+        ep.connect(ce[4], mod);
+        ep.connect(ce[5], mod);
+        
+        Multiply mul1 = new Multiply(getOK());
+        ep.add(mul1);
+        ep.connect(mod, mul1);
+        ep.connect(ce[6], mul1);
+        
+        Multiply mul2 = new Multiply(getOK());
+        ep.add(mul2);
+        ep.connect(sub, mul2);
+        ep.connect(mul1, mul2);
+        
+        return ep;
+    }
+    
+    private static OperatorKey getOK(){
+        return new OperatorKey("",r.nextLong());
+    }
+    
+    public static void setPc(PigContext pc) {
+        GenPhyOp.pc = pc;
+    }
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java?rev=656011&r1=656010&r2=656011&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java Tue May 13 14:11:21 2008
@@ -17,13 +17,24 @@
  */
 package org.apache.pig.test.utils;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.Iterator;
+import java.util.Random;
 
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DefaultBagFactory;
 import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
 
 /**
  * Will contain static methods that will be useful
@@ -31,6 +42,7 @@
  *
  */
 public class TestHelper {
+    public static int dispAfterNumTuples = 1000;
     public static boolean bagContains(DataBag db, Tuple t) {
         Iterator<Tuple> iter = db.iterator();
         for (Tuple tuple : db) {
@@ -43,7 +55,8 @@
     public static boolean compareBags(DataBag db1, DataBag db2) {
         if (db1.size() != db2.size())
             return false;
-
+        
+        int i=-1;
         boolean equal = true;
         for (Tuple tuple : db2) {
             boolean contains = false;
@@ -57,6 +70,8 @@
                 equal = false;
                 break;
             }
+            /*if(++i%dispAfterNumTuples==0)
+                System.out.println(i/dispAfterNumTuples);*/
         }
         return equal;
     }
@@ -71,4 +86,129 @@
         }
         return ret;
     }
+    
+    public static DataBag projectBag(DataBag db2, int[] fields) throws ExecException {
+        DataBag ret = DefaultBagFactory.getInstance().newDefaultBag();
+        for (Tuple tuple : db2) {
+            Tuple t1 = new DefaultTuple();
+            for (int fld : fields) {
+                Object o = tuple.get(fld);
+                t1.append(o);
+            }
+            ret.add(t1);
+        }
+        return ret;
+    }
+    
+    public static int compareInputStreams(InputStream exp, InputStream act) throws IOException{
+        byte[] bExp = new byte[4096], bAct = new byte[4096];
+        
+        int outLen,inLen = -1;
+        while(act.read(bAct)!=-1){
+            exp.read(bExp);
+            int cmp = compareByteArray(bExp, bAct);
+            if(cmp!=0)
+                return cmp;
+        }
+        return 0;
+    }
+    
+    public static int compareByteArray(byte[] b1, byte[] b2){
+        if(b1.length>b2.length)
+            return 1;
+        else if(b1.length<b2.length)
+            return -1;
+        for(int i=0;i<b1.length;i++){
+            if(b1[i]>b2[i])
+                return 1;
+            else if(b1[i]<b2[i])
+                return -1;
+        }
+        return 0;
+    }
+    
+    /*public static boolean areFilesSame(FileSpec expLocal, FileSpec actHadoop, PigContext pc, int dispAftNumTuples) throws ExecException, IOException{
+        Random r = new Random();
+        
+        POLoad ldExp = new POLoad(new OperatorKey("", r.nextLong()));
+        ldExp.setPc(pc);
+        ldExp.setLFile(expLocal);
+        
+        POLoad ldAct = new POLoad(new OperatorKey("", r.nextLong()));
+        ldAct.setPc(pc);
+        ldAct.setLFile(actHadoop);
+        
+        Tuple t = null;
+        int numActTuples = -1;
+        boolean matches = true;
+        for(Result resAct=ldAct.getNext(t);resAct.returnStatus!=POStatus.STATUS_EOP;resAct=ldAct.getNext(t)){
+            Tuple tupAct = (Tuple)resAct.result;
+            ++numActTuples;
+            boolean found = false;
+            for(Result resExp=ldExp.getNext(t);resExp.returnStatus!=POStatus.STATUS_EOP;resExp=ldExp.getNext(t)){
+                Tuple tupExp = (Tuple)resExp.result;
+                if(tupAct.compareTo(tupExp)==0){
+                    found = true;
+                    ldExp.tearDown();
+                    break;
+                }
+            }
+            if(!found){
+                matches = false;
+                break;
+            }
+            if(numActTuples%dispAftNumTuples ==0)
+                System.out.println(numActTuples/dispAftNumTuples);
+        }
+        
+        int numExpTuples = -1;
+        while(ldExp.getNext(t).returnStatus!=POStatus.STATUS_EOP)
+            ++numExpTuples;
+        
+        return (matches && numActTuples==numExpTuples);
+    }*/
+    
+    public static boolean areFilesSame(FileSpec expLocal, FileSpec actHadoop, PigContext pc) throws ExecException, IOException{
+        Random r = new Random();
+        
+        POLoad ldExp = new POLoad(new OperatorKey("", r.nextLong()));
+        ldExp.setPc(pc);
+        ldExp.setLFile(expLocal);
+        
+        POLoad ldAct = new POLoad(new OperatorKey("", r.nextLong()));
+        ldAct.setPc(pc);
+        ldAct.setLFile(actHadoop);
+        
+        Tuple t = null;
+        int numActTuples = -1;
+        DataBag bagAct = DefaultBagFactory.getInstance().newDefaultBag();
+        Result resAct = null;
+        while((resAct = ldAct.getNext(t)).returnStatus!=POStatus.STATUS_EOP){
+            ++numActTuples;
+            bagAct.add(trimTuple((Tuple)resAct.result));
+        }
+        
+        int numExpTuples = -1;
+        DataBag bagExp = DefaultBagFactory.getInstance().newDefaultBag();
+        Result resExp = null;
+        while((resExp = ldExp.getNext(t)).returnStatus!=POStatus.STATUS_EOP){
+            ++numExpTuples;
+            bagExp.add(trimTuple((Tuple)resExp.result));
+        }
+        
+        if(numActTuples!=numExpTuples)
+            return false;
+        
+        return compareBags(bagExp, bagAct);
+    }
+    
+    private static Tuple trimTuple(Tuple t){
+        Tuple ret = TupleFactory.getInstance().newTuple();
+        for (Object o : t.getAll()) {
+            DataByteArray dba = (DataByteArray)o;
+            DataByteArray nDba = new DataByteArray(dba.toString().trim().getBytes());
+            ret.append(nDba);
+        }
+        return ret;
+    }
 }